RTC Toolkit  2.0.0
ddsPubThread.hpp
Go to the documentation of this file.
1 
11 #ifndef RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP_
12 #define RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP_
13 
14 #include "repubBuf.hpp"
15 #include <boost/lockfree/spsc_queue.hpp>
16 
17 #include <chrono>
18 #include <iostream>
19 
23 
24 #include <numapp/thread.hpp>
25 
26 namespace rtctk::telRepub {
27 
28 using namespace rtctk::componentFramework;
29 
35 template <uint16_t Q_SIZE = 300>
37 public:
38  PubThreadBase(const char *topic_name, std::uint32_t queue_size) : m_logger(GetLogger()) {
39  m_repub_buf =
40  new RepubBuf<rtctk::componentFramework::AgnosticTopic>(topic_name, queue_size);
41  topic_name_m = topic_name;
42  rcvSamples_m = 0;
43  skipedSamples_m = 0;
44  lastSkipedSample_m = 0;
45  }
46 
47  // TBD maybe merge two ctor
48  PubThreadBase(const std::string &topic_name, std::uint32_t queue_size) : m_logger(GetLogger()) {
49  m_repub_buf =
50  new RepubBuf<rtctk::componentFramework::AgnosticTopic>(topic_name, queue_size);
51  topic_name_m = topic_name;
52  rcvSamples_m = 0;
53  skipedSamples_m = 0;
54  lastSkipedSample_m = 0;
55  }
56  virtual ~PubThreadBase() {
57  if (skipedSamples_m > 0) {
58  LOG4CPLUS_INFO_FMT(m_logger,
59  "[%s] skipped %lu samples out of %lu ratio: %f. Last @: %u\n",
60  topic_name_m.c_str(),
61  skipedSamples_m,
62  rcvSamples_m,
63  static_cast<double>(skipedSamples_m) / rcvSamples_m,
64  lastSkipedSample_m);
65  } else {
66  LOG4CPLUS_INFO_FMT(m_logger,
67  "[%s] Received: %lu samples. No samples skipped\n",
68  topic_name_m.c_str(),
69  rcvSamples_m);
70  }
71  delete m_repub_buf;
72  }
73 
74  std::string getTopicName() {
75  return topic_name_m;
76  }
77 
83  return m_repub_buf;
84  }
85 
86 protected:
88  unsigned long rcvSamples_m;
89  unsigned long skipedSamples_m;
91  std::string topic_name_m;
92  boost::mutex mut;
93  boost::condition_variable cond;
94 
95  log4cplus::Logger m_logger;
96 };
97 
101 // TBD maybe we do not need template
102 
103 template <typename T = rtctk::componentFramework::AgnosticTopic,
104  uint16_t Q_SIZE = 300,
105  typename TDW = DataWriter>
106 class PubThread : public PubThreadBase<Q_SIZE> {
107 public:
108  PubThread(TDW *dw,
110  std::uint32_t queue_size = 300)
111  : PubThreadBase<Q_SIZE>(dw->get_topic()->get_name(), queue_size)
112  , m_loop(true)
113  , m_framecounter(0)
114  , m_data_writer(dw)
115  , m_metrics(metrics) {
116  LOG4CPLUS_DEBUG_FMT(this->m_logger, "%s", __PRETTY_FUNCTION__);
117 
118  // Register thread to component metrics service
119  std::string id_prefix = this->topic_name_m;
120  // we need to convert to lowercase for OLDB DataPoint (PC Id)
121  std::transform(id_prefix.begin(), id_prefix.end(), id_prefix.begin(), ::tolower);
122 
123  m_pc_sent_samples_reg = m_metrics.AddCounter(
124  &m_pc_sent_samples,
125  CounterMetricInfo(id_prefix + "/dds_publisher/sent_samples",
126  "DDS samples sent for topic: " + this->topic_name_m));
127 
128  } // PubThread
129 
131  LOG4CPLUS_DEBUG_FMT(this->m_logger, "%s", __PRETTY_FUNCTION__);
132  } //~PubThread
133 
138  void ReadQueuePub() {
139  ReturnCode_t retcode = ReturnCode_t::RETCODE_OK;
140  std::chrono::high_resolution_clock::time_point start_time;
141  boost::unique_lock<boost::mutex> lock(this->mut);
142 
143  /* Register thread to component metrics service */
144  std::string id_suffix = m_thread_name;
145  // we need to convert to lowercase for OLDB DataPoint (PC Id)
146  std::transform(id_suffix.begin(), id_suffix.end(), id_suffix.begin(), ::tolower);
147  m_metrics.AddThread(numapp::thisThread::GetThreadId(),
148  ThreadMetricInfo("dds_publishers/" + id_suffix,
149  "[Dpub" + this->topic_name_m +
150  "] Telemetry republisher (DDS) publisher thread"));
151 
152  while (m_loop) {
153  m_msg = this->m_repub_buf->WaitNextAvail();
154  if (m_msg == nullptr) {
155  break;
156  } // we assume that nullptr is returned just when ReleaseWait is called
157  m_sample_id = m_msg->sample_id();
158  if (m_sample_id != m_last_sample_id + 1) {
159  std::cout << "[" << this->topic_name_m
160  << "] sampleId mismatch last published: " << m_last_sample_id
161  << " current to be published: " << m_sample_id << std::endl;
162  }
163 
164  m_last_sample_id = m_sample_id;
165 
166  retcode = m_data_writer->write(m_msg, eprosima::fastrtps::rtps::c_InstanceHandle_Unknown);
167  this->m_repub_buf->MakeFree();
168 
169  m_framecounter++;
170  m_pc_sent_samples++;
171 
172  if (retcode != ReturnCode_t::RETCODE_OK) {
173  if (retcode == ReturnCode_t::RETCODE_TIMEOUT) {
174  LOG4CPLUS_ERROR_FMT(this->m_logger,
175  "[%s]SampleId: %u. DDS write timeout!",
176  this->topic_name_m.c_str(),
177  m_sample_id);
178  } else {
179  LOG4CPLUS_ERROR_FMT(this->m_logger,
180  "[%s]SampleId: %u. DDS failed to write. Return code: %s",
181  this->topic_name_m.c_str(),
182  m_sample_id,
183  RetcodeToString(retcode).c_str());
184  }
185  }
186 
187  } // while
188 
189  LOG4CPLUS_DEBUG_FMT(this->m_logger, "[%s]Flushing queue ...\n", this->topic_name_m.c_str());
190  // flushing queue
191  while (this->m_repub_buf->GetAvail() > 0) {
192  m_msg = this->m_repub_buf->WaitNextAvail();
193  if (m_msg == nullptr) {
194  break;
195  } // we assume that nullptr is returned just when ReleaseWait is called
196  m_sample_id = m_msg->sample_id();
197  retcode = m_data_writer->write(m_msg, eprosima::fastrtps::rtps::c_InstanceHandle_Unknown);
198  this->m_repub_buf->MakeFree();
199 
200  m_framecounter++;
201  m_pc_sent_samples++;
202  if ((m_framecounter % 1) == 0) {
203  LOG4CPLUS_DEBUG_FMT(this->m_logger,
204  "[%s] Flushing SampleId: %u. Droped: %lu (%f). Last @ %u\n",
205  this->topic_name_m.c_str(),
206  m_sample_id,
207  this->skipedSamples_m,
208  static_cast<double>(this->skipedSamples_m) / this->rcvSamples_m,
209  this->lastSkipedSample_m);
210  // printNUMAmem(this->queue_m, "Queue");
211  }
212  } // while
213  m_metrics.RemoveThread(numapp::thisThread::GetThreadId());
214  } // readQueuePub
215 
220  void SimPub() {
221  ReturnCode_t retcode = ReturnCode_t::RETCODE_OK;
222  unsigned int sampleId = 0;
223  /* Register thread to component metrics service */
224  std::string id_suffix = m_thread_name;
225  // we need to convert to lowercase for OLDB DataPoint (PC Id)
226  std::transform(id_suffix.begin(), id_suffix.end(), id_suffix.begin(), ::tolower);
227  m_metrics.AddThread(
228  numapp::thisThread::GetThreadId(),
229  ThreadMetricInfo("dds_publishers/" + id_suffix,
230  "[Spub" + this->topic_name_m +
231  "] Telemetry republisher (DDS) publisher thread - SIMULATION"));
232 
233  while (m_loop) {
234  m_message.sample_id(sampleId);
235  // TBD: added some load to the sample
236  retcode = m_data_writer->write(&m_message, eprosima::fastrtps::rtps::c_InstanceHandle_Unknown);
237  m_framecounter++;
238  m_pc_sent_samples++;
239 
240  if (retcode != ReturnCode_t::RETCODE_OK) {
241  if (retcode == ReturnCode_t::RETCODE_TIMEOUT) {
242  LOG4CPLUS_ERROR_FMT(this->m_logger,
243  "[%s] Timeout while sending (write): %lu sample.\n",
244  this->topic_name_m.c_str(),
245  m_framecounter);
246  } else {
247  LOG4CPLUS_ERROR_FMT(
248  this->m_logger,
249  "[%s] Failed to send (write): %lu sample. Return Code: %s\n",
250  this->topic_name_m.c_str(),
251  m_framecounter,
252  RetcodeToString(retcode).c_str());
253  }
254  } // if
255 
256  sampleId++;
257  usleep(m_sim_loop_sleep);
258  } // while
259  m_metrics.RemoveThread(numapp::thisThread::GetThreadId());
260  } // SimPub
261 
268  void CreateSimThread(u_int16_t f, numapp::NumaPolicies const &pub_thread_policies) {
269  m_sim_loop_sleep = 1000000 / f;
270  m_thread_name = "Spub" + this->topic_name_m;
271  if (m_thread_name.size() > 15) {
272  m_thread_name.erase(15);
273  }
274  m_thread = numapp::MakeThread(
275  m_thread_name, pub_thread_policies, &PubThread<T, Q_SIZE, TDW>::SimPub, this);
276  } // CreateSimThread
277 
283  void CreateThread(numapp::NumaPolicies const &pub_thread_policies) {
284  m_thread_name = "Dpub" + this->topic_name_m;
285  if (m_thread_name.size() > 15) {
286  m_thread_name.erase(15);
287  }
288  m_thread = numapp::MakeThread(
289  m_thread_name, pub_thread_policies, &PubThread<T, Q_SIZE, TDW>::ReadQueuePub, this);
290  } // CreateThread
291 
295  void JoinThread() {
296  m_loop = false;
297  this->m_repub_buf->ReleaseWait();
298  this->cond.notify_one();
299  m_thread.join();
300  } // JoinThread
301 
302 protected:
303  std::thread m_thread;
304  std::string m_thread_name;
305  std::atomic<bool> m_loop;
306  unsigned long m_framecounter;
307  uint32_t m_sample_id = 0; // TBD: can be removed, and use direct m_msg->sample_id() when needed
308  uint32_t m_last_sample_id = 0;
309 
311  T *m_msg;
313 
315 
320  // @note: OLDB does not support uint64_t which we really want, so we have to use signed 64.
321  perfc::CounterI64 m_pc_sent_samples;
322  perfc::ScopedRegistration m_pc_sent_samples_reg;
323 
324  useconds_t m_sim_loop_sleep;
325 };
326 
327 } // namespace rtctk::telRepub
328 
329 #endif
rtctk::telRepub::PubThreadBase::PubThreadBase
PubThreadBase(const char *topic_name, std::uint32_t queue_size)
Definition: ddsPubThread.hpp:38
rtctk::telRepub::PubThreadBase::m_logger
log4cplus::Logger m_logger
Definition: ddsPubThread.hpp:95
rtctk::telRepub::PubThread
Definition: ddsPubThread.hpp:106
rtctk::telRepub::PubThreadBase::cond
boost::condition_variable cond
Definition: ddsPubThread.hpp:93
rtctk::telRepub::PubThread::SimPub
void SimPub()
Thread worker in case of simulation i.e.
Definition: ddsPubThread.hpp:220
rtctk::componentFramework::RetcodeToString
std::string RetcodeToString(ReturnCode_t retcode)
Definition: ddsCommon.cpp:17
rtctk::componentFramework::ThreadMetricInfo
Defines auxiliary information associated with each thread registered with ComponentMetricsIf.
Definition: componentMetricsIf.hpp:80
rtctk::telRepub::PubThreadBase::PubThreadBase
PubThreadBase(const std::string &topic_name, std::uint32_t queue_size)
Definition: ddsPubThread.hpp:48
rtctk::telRepub::PubThreadBase::lastSkipedSample_m
uint32_t lastSkipedSample_m
Definition: ddsPubThread.hpp:90
rtctk::telRepub::PubThread::m_thread_name
std::string m_thread_name
Definition: ddsPubThread.hpp:304
rtctk::telRepub::PubThreadBase::mut
boost::mutex mut
Definition: ddsPubThread.hpp:92
componentMetricsIf.hpp
Header file for ComponentMetricsIf.
rtctk::componentFramework
Definition: commandReplier.cpp:20
rtctk::telRepub::PubThread::m_msg
T * m_msg
Definition: ddsPubThread.hpp:311
rtctk::telRepub::PubThreadBase::skipedSamples_m
unsigned long skipedSamples_m
Definition: ddsPubThread.hpp:89
rtctk::telRepub
Definition: ddsPubThread.hpp:26
rtctk::telRepub::PubThread::m_metrics
componentFramework::ComponentMetricsIf & m_metrics
Definition: ddsPubThread.hpp:314
rtctk::telRepub::PubThread::m_message
T m_message
Definition: ddsPubThread.hpp:310
rtctk::telRepub::PubThread::m_data_writer
TDW * m_data_writer
Definition: ddsPubThread.hpp:312
rtctk::componentFramework::CounterMetricInfo
Defines auxiliary information associated with each counter registered with ComponentMetricsIf.
Definition: componentMetricsIf.hpp:42
mudpi::uint16_t
unsigned short uint16_t
Definition: mudpi.h:15
rtctk::telRepub::PubThreadBase::topic_name_m
std::string topic_name_m
Definition: ddsPubThread.hpp:91
repubBuf.hpp
Buffer that keeps available free samples, and samples that are filled.
rtctk::componentFramework::ComponentMetricsIf
Component metrics interface.
Definition: componentMetricsIf.hpp:177
rtctk::componentFramework::GetLogger
log4cplus::Logger & GetLogger(const std::string &name="")
Get handle to a specific logger (used with logging macros)
rtctk::telRepub::PubThreadBase::rcvSamples_m
unsigned long rcvSamples_m
Definition: ddsPubThread.hpp:88
rtctk::telRepub::PubThread::CreateSimThread
void CreateSimThread(u_int16_t f, numapp::NumaPolicies const &pub_thread_policies)
Create thread for simulation.
Definition: ddsPubThread.hpp:268
rtctk::telRepub::RepubBuf< rtctk::componentFramework::AgnosticTopic >
rtctk::telRepub::PubThread::PubThread
PubThread(TDW *dw, componentFramework::ComponentMetricsIf &metrics, std::uint32_t queue_size=300)
Definition: ddsPubThread.hpp:108
rtctk::telRepub::PubThread::~PubThread
~PubThread()
Definition: ddsPubThread.hpp:130
rtctk::telRepub::PubThread::m_framecounter
unsigned long m_framecounter
Definition: ddsPubThread.hpp:306
rtctk::telRepub::PubThreadBase::m_repub_buf
RepubBuf< rtctk::componentFramework::AgnosticTopic > * m_repub_buf
Definition: ddsPubThread.hpp:87
rtctk::telRepub::PubThreadBase::getTopicName
std::string getTopicName()
Definition: ddsPubThread.hpp:74
rtctk::telRepub::PubThread::JoinThread
void JoinThread()
Join thread.
Definition: ddsPubThread.hpp:295
rtctk::telRepub::PubThreadBase::~PubThreadBase
virtual ~PubThreadBase()
Definition: ddsPubThread.hpp:56
ddsCommon.hpp
Declares some common DDS functionality.
rtctk::telRepub::PubThread::m_loop
std::atomic< bool > m_loop
Definition: ddsPubThread.hpp:305
rtctk::telRepub::PubThread::m_pc_sent_samples_reg
perfc::ScopedRegistration m_pc_sent_samples_reg
Definition: ddsPubThread.hpp:322
rtctk::telRepub::PubThread::m_pc_sent_samples
perfc::CounterI64 m_pc_sent_samples
Diverse Performance counters.
Definition: ddsPubThread.hpp:321
mudpi::uint32_t
unsigned int uint32_t
Definition: mudpi.h:16
rtctk::telRepub::PubThread::m_thread
std::thread m_thread
Definition: ddsPubThread.hpp:303
logger.hpp
Logging Support Library based on log4cplus.
rtctk::telRepub::PubThread::ReadQueuePub
void ReadQueuePub()
Thread worker that reads agnostic topic from the buffer/queue and publishes it via DDS.
Definition: ddsPubThread.hpp:138
rtctk::telRepub::PubThreadBase
DDS publisher thread class that provides queue with aggregated topic to be published by DDS.
Definition: ddsPubThread.hpp:36
rtctk::telRepub::PubThread::CreateThread
void CreateThread(numapp::NumaPolicies const &pub_thread_policies)
Create thread for publishing data from the queue.
Definition: ddsPubThread.hpp:283
rtctk::telRepub::PubThreadBase::GetRepubBuf
RepubBuf< rtctk::componentFramework::AgnosticTopic > * GetRepubBuf()
Get internal queue/buffer.
Definition: ddsPubThread.hpp:82
rtctk::telRepub::PubThread::m_sim_loop_sleep
useconds_t m_sim_loop_sleep
Definition: ddsPubThread.hpp:324