RTC Toolkit  1.0.0
ddsPubThread.hpp
Go to the documentation of this file.
1 
11 #ifndef RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP_
12 #define RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP_
13 
14 #include <boost/thread/thread.hpp>
15 
16 #include "repubBuf.hpp"
17 #include <boost/lockfree/spsc_queue.hpp>
18 
19 #include <chrono>
20 #include <iostream>
21 
24 
25 using namespace rtctk::componentFramework;
26 
27 namespace rtctk::telRepub {
28 
32 uint32_t const MAX_TOPIC_SIZE = 2560000; // Large increase for Pixel topics (DOR)
33 
38 struct topicT {
39  std::array<unsigned char, MAX_TOPIC_SIZE> data;
40  std::size_t size;
42 };
43 
49 template <uint16_t Q_SIZE = 300>
51 public:
52  PubThreadBase(const char *topic_name, std::uint32_t queue_size) {
53 #ifdef USE_RTI_DDS
54  queue_m = new boost::lockfree::spsc_queue<topicT, boost::lockfree::capacity<Q_SIZE> >;
55 #else
56  m_repub_buf = new RepubBuf<rtctk::AgnosticTopic>(topic_name, queue_size);
57 #endif
58  topic_name_m = topic_name;
59  rcvSamples_m = 0;
60  skipedSamples_m = 0;
61  lastSkipedSample_m = 0;
62  }
63 
64  // TBD maybe merge two ctor
65  PubThreadBase(const std::string &topic_name, std::uint32_t queue_size) {
66 #ifdef USE_RTI_DDS
67  queue_m = new boost::lockfree::spsc_queue<topicT, boost::lockfree::capacity<Q_SIZE> >;
68 #else
69  m_repub_buf = new RepubBuf<rtctk::AgnosticTopic>(topic_name, queue_size);
70 #endif
71  topic_name_m = topic_name;
72  rcvSamples_m = 0;
73  skipedSamples_m = 0;
74  lastSkipedSample_m = 0;
75  }
76  virtual ~PubThreadBase() {
77  if (skipedSamples_m > 0) {
78  LOG4CPLUS_INFO_FMT(GetLogger(),
79  "[%s] skipped %lu samples out of %lu ratio: %f. Last @: %u\n",
80  topic_name_m.c_str(),
81  skipedSamples_m,
82  rcvSamples_m,
83  static_cast<double>(skipedSamples_m) / rcvSamples_m,
84  lastSkipedSample_m);
85  } else {
86  LOG4CPLUS_INFO_FMT(GetLogger(),
87  "[%s] Received: %lu samples. No samples skipped\n",
88  topic_name_m.c_str(),
89  rcvSamples_m);
90  }
91 #ifdef USE_RTI_DDS
92  delete queue_m;
93 #else
94  delete m_repub_buf;
95 #endif
96  }
97 
98 #ifdef USE_RTI_DDS
99 
103  inline virtual bool push(topicT const &value) {
104  rcvSamples_m++;
105 
106  push_ret = queue_m->push(value);
107  this->cond.notify_one();
108  if (!push_ret) {
109  skipedSamples_m++;
110  LOG4CPLUS_DEBUG_FMT(
111  GetLogger(),
112  "[%s] SampleId: %u overrun, so far skipped %lu samples. Last @: %u\n",
113  topic_name_m.c_str(),
114  value.sampleId,
115  skipedSamples_m,
116  lastSkipedSample_m);
117  lastSkipedSample_m = value.sampleId;
118  }
119  return push_ret;
120  }
121 #endif
122 
123  std::string getTopicName() {
124  return topic_name_m;
125  }
126 
132 #ifdef USE_RTI_DDS
133  LOG4CPLUS_ERROR_FMT(GetLogger(), " Method Not implemented for RTI DDS (as it is not used)");
134  return nullptr;
135 #else
136  return m_repub_buf;
137 #endif
138  }
139 
140 protected:
141  bool push_ret;
142 #ifdef USE_RTI_DDS
143  boost::lockfree::spsc_queue<topicT, boost::lockfree::capacity<Q_SIZE> > *queue_m;
144 #else
146 #endif
147  unsigned long rcvSamples_m;
148  unsigned long skipedSamples_m;
150  std::string topic_name_m;
151  boost::mutex mut;
152  boost::condition_variable cond;
153 };
154 
158 // TBD maybe we do not need template
159 
160 template <typename T = rtctk::AgnosticTopic, uint16_t Q_SIZE = 300, typename TDW = TypedDataWriter>
161 class PubThread : public PubThreadBase<Q_SIZE> {
162 public:
163  PubThread(TDW *dw, std::uint32_t queue_size=300) : PubThreadBase<Q_SIZE>(dw->get_topic()->get_name(), queue_size) {
164  LOG4CPLUS_DEBUG_FMT(GetLogger(), "%s", __PRETTY_FUNCTION__);
165  m_data_writer = dw;
166  m_framecounter = 0;
167  m_loop = true;
168  }
169 
171  LOG4CPLUS_DEBUG_FMT(GetLogger(), "%s", __PRETTY_FUNCTION__);
172  }
173 
178  void ReadQueuePub() {
179  ReturnCode_t retcode = DDS_RETCODE_OK;
180  std::chrono::high_resolution_clock::time_point start_time;
181  boost::unique_lock<boost::mutex> lock(this->mut);
182 #ifdef USE_RTI_DDS
183  topicT value;
184 #endif
185 
186  while (m_loop) {
187 #ifdef USE_RTI_DDS
188  this->cond.wait(lock);
189  while (this->queue_m->pop(value)) {
190  start_time = std::chrono::high_resolution_clock::now();
191  m_message.serialized_data.loan_contiguous(
192  value.data.data(), value.size, value.data.max_size());
193  m_sample_id = m_message.sample_id = value.sampleId;
194  retcode = m_data_writer->write(m_message, DDS_HANDLE_NIL);
195  m_message.serialized_data.unloan();
196 
197 #else
198  m_msg = this->m_repub_buf->WaitNextAvail();
199  if (m_msg == nullptr) {
200  break;
201  } // we assume that nullptr is returned just when ReleaseWait is called
202  m_sample_id = m_msg->sample_id();
203  if (m_sample_id != m_last_sample_id + 1) {
204  std::cout << "[" << this->topic_name_m
205  << "] sample id mismatch last: " << m_last_sample_id
206  << " current: " << m_sample_id << std::endl;
207  }
208 
209  m_last_sample_id = m_sample_id;
210 
211  retcode = m_data_writer->write(m_msg, DDS_HANDLE_NIL);
212  this->m_repub_buf->MakeFree();
213 #endif
214 
215  m_framecounter++;
216 
217  if (retcode != DDS_RETCODE_OK) {
218  if (retcode == DDS_RETCODE_TIMEOUT) {
219  LOG4CPLUS_ERROR_FMT(GetLogger(),
220  "[%s]SampleId: %u. DDS write timeout!",
221  this->topic_name_m.c_str(),
222  m_sample_id);
223  } else {
224  LOG4CPLUS_ERROR_FMT(
225  GetLogger(),
226  "[%s]SampleId: %u. DDS failed to write. Return code: %s",
227  this->topic_name_m.c_str(),
228  m_sample_id,
229  RetcodeToString(retcode).c_str());
230  }
231  }
232 #ifdef USE_RTI_DDS
233  if ((m_framecounter % 2000) == 0) {
234  if (this->skipedSamples_m > 0) {
235  LOG4CPLUS_DEBUG_FMT(
236  GetLogger(),
237  "[%s]SampleId: %u. Dropped: %lu (%f). Last @: %u\n",
238  this->topic_name_m.c_str(),
239  m_sample_id,
240  this->skipedSamples_m,
241  static_cast<double>(this->skipedSamples_m) / this->rcvSamples_m,
242  this->lastSkipedSample_m);
243  } else {
244  LOG4CPLUS_DEBUG_FMT(GetLogger(),
245  "[%s]SampleId: %u. No Drops so far.\n",
246  this->topic_name_m.c_str(),
247  m_sample_id);
248  }
249  }
250 #endif
251 
252 #ifdef USE_RTI_DDS
253  } // while
254 #endif
255 
256  } // while
257 
258  LOG4CPLUS_DEBUG_FMT(GetLogger(), "[%s]Flushing queue ...\n", this->topic_name_m.c_str());
259  // flushing queue
260 #ifdef USE_RTI_DDS
261  while (this->queue_m->pop(value)) {
262  m_message.serialized_data.loan_contiguous(
263  value.data.data(), value.size, value.data.max_size());
264  m_sample_id = m_message.sample_id = value.sampleId;
265  retcode = m_data_writer->write(m_message, DDS_HANDLE_NIL);
266  m_message.serialized_data.unloan();
267 #else
268  while (this->m_repub_buf->GetAvail() > 0) {
269  m_msg = this->m_repub_buf->WaitNextAvail();
270  if (m_msg == nullptr) {
271  break;
272  } // we assume that nullptr is returned just when ReleaseWait is called
273  m_sample_id = m_msg->sample_id();
274  retcode = m_data_writer->write(m_msg, DDS_HANDLE_NIL);
275  this->m_repub_buf->MakeFree();
276 #endif
277 
278  m_framecounter++;
279  if ((m_framecounter % 1) == 0) {
280  LOG4CPLUS_DEBUG_FMT(GetLogger(),
281  "[%s] Flushing SampleId: %u. Droped: %lu (%f). Last @ %u\n",
282  this->topic_name_m.c_str(),
283  m_sample_id,
284  this->skipedSamples_m,
285  static_cast<double>(this->skipedSamples_m) / this->rcvSamples_m,
286  this->lastSkipedSample_m);
287  // printNUMAmem(this->queue_m, "Queue");
288  }
289  } // while
290 
291  } // readQueuePub
292 
297  void SimPub() {
298  ReturnCode_t retcode = DDS_RETCODE_OK;
299  unsigned int sampleId = 0;
300  while (m_loop) {
301 #ifdef USE_RTI_DDS
302  m_message.sample_id = sampleId;
303  // TBD: added some load to the sample
304  retcode = m_data_writer->write(m_message, DDS_HANDLE_NIL);
305 #else
306  m_message.sample_id(sampleId);
307  // TBD: added some load to the sample
308  retcode = m_data_writer->write(&m_message, DDS_HANDLE_NIL);
309 #endif
310  m_framecounter++;
311 
312  if (retcode != DDS_RETCODE_OK) {
313  if (retcode == DDS_RETCODE_TIMEOUT) {
314  LOG4CPLUS_ERROR_FMT(GetLogger(),
315  "[%s] Timeout while sending (write): %lu sample.\n",
316  this->topic_name_m.c_str(),
317  m_framecounter);
318  } else {
319  LOG4CPLUS_ERROR_FMT(
320  GetLogger(),
321  "[%s] Failed to send (write): %lu sample. Return Code: %s\n",
322  this->topic_name_m.c_str(),
323  m_framecounter,
324  RetcodeToString(retcode).c_str());
325  }
326  } // if
327 
328  sampleId++;
329  usleep(m_sim_loop_sleep);
330  } // while
331  } // SimPub
332 
338  void CreateSimThread(u_int16_t f) {
339  m_sim_loop_sleep = 1000000 / f;
340  m_thread = boost::thread(&PubThread<T, Q_SIZE, TDW>::SimPub, this);
341  std::string thr_name = "Spub" + this->topic_name_m;
342  pthread_setname_np(this->m_thread.native_handle(), thr_name.c_str());
343  }
344 
349  void CreateThread() {
350  m_thread = boost::thread(&PubThread<T, Q_SIZE, TDW>::ReadQueuePub, this);
351  std::string thr_name = "Dpub" + this->topic_name_m;
352  pthread_setname_np(this->m_thread.native_handle(), thr_name.c_str());
353  }
354 
358  void JoinThread() {
359  m_loop = false;
360 #ifndef USE_RTI_DDS
361  this->m_repub_buf->ReleaseWait();
362 #endif
363  this->cond.notify_one();
364  m_thread.join();
365  } // JoinThread
366 
371  void SetAffinity(unsigned int cpu) {
372  int ret;
373  cpu_set_t cpuset;
374  CPU_ZERO(&cpuset);
375  CPU_SET(cpu, &cpuset);
376  ret = pthread_setaffinity_np(m_thread.native_handle(), sizeof(cpuset), &cpuset);
377  if (ret != 0) {
378  char thr_name[64];
379  pthread_getname_np(m_thread.native_handle(), thr_name, 64);
380  LOG4CPLUS_ERROR_FMT(GetLogger(),
381  "%s ERROR set thread affinity for: %s. Error: %s(%d)",
382  __PRETTY_FUNCTION__,
383  thr_name,
384  strerror(ret),
385  ret);
386  } // if
387  } // SetAffinity
388 
392  void PrintAffinity() {
393  cpu_set_t cpuset;
394  CPU_ZERO(&cpuset);
395 
396  pthread_getaffinity_np(m_thread.native_handle(), sizeof(cpuset), &cpuset);
397 
398  std::string af;
399  for (int j = 0; j < CPU_SETSIZE; j++) {
400  if (CPU_ISSET(j, &cpuset)) {
401  af += std::to_string(j) + " ";
402  }
403  }
404  char thr_name[64];
405  pthread_getname_np(m_thread.native_handle(), thr_name, 64);
406  LOG4CPLUS_INFO_FMT(GetLogger(), " CPU affinity for thread: %s: %s", thr_name, af.c_str());
407  } // GetAffinity
408 
409 #ifdef USE_RTI_DDS
410 
413  void WaitForAcknowledgments() {
414  Duration_t to = {10, 0}; // 10s
415  ReturnCode_t ret = m_data_writer->wait_for_acknowledgments(to);
416  if (ret != DDS_RETCODE_OK) {
417  LOG4CPLUS_ERROR_FMT(GetLogger(),
418  "[%s] Failed wait_for_acknowledgments. Return code: %s\n",
419  this->topic_name_m.c_str(),
420  m_framecounter,
421  RetcodeToString(ret).c_str());
422  } // if
423  } // wait_for_acknowledgments
424 #endif
425 
426 protected:
427  boost::thread m_thread;
428  std::atomic<bool> m_loop;
429  unsigned long m_framecounter;
430  uint32_t m_sample_id = 0; // TBD: can be removed, and use direct m_msg->sample_id() when needed
431  uint32_t m_last_sample_id = 0;
432 
434 #ifndef USE_RTI_DDS
435  T *m_msg;
436 #endif
438 
439  useconds_t m_sim_loop_sleep;
440 };
441 
442 } // namespace rtctk::telRepub
443 
444 #endif
rtctk::telRepub::PubThreadBase::PubThreadBase
PubThreadBase(const char *topic_name, std::uint32_t queue_size)
Definition: ddsPubThread.hpp:52
DDS_RETCODE_OK
#define DDS_RETCODE_OK
Definition: ddsCommon.hpp:85
rtctk::telRepub::PubThread
Definition: ddsPubThread.hpp:161
rtctk::telRepub::PubThreadBase::cond
boost::condition_variable cond
Definition: ddsPubThread.hpp:152
rtctk::telRepub::PubThread::SimPub
void SimPub()
Thread worker in case of simulation i.e.
Definition: ddsPubThread.hpp:297
rtctk::componentFramework::RetcodeToString
std::string RetcodeToString(ReturnCode_t retcode)
Definition: ddsCommon.cpp:16
rtctk::telRepub::PubThreadBase::PubThreadBase
PubThreadBase(const std::string &topic_name, std::uint32_t queue_size)
Definition: ddsPubThread.hpp:65
rtctk::telRepub::PubThreadBase::lastSkipedSample_m
uint32_t lastSkipedSample_m
Definition: ddsPubThread.hpp:149
rtctk::telRepub::PubThreadBase::mut
boost::mutex mut
Definition: ddsPubThread.hpp:151
rtctk::componentFramework
Definition: commandReplier.cpp:20
rtctk::telRepub::PubThread::m_msg
T * m_msg
Definition: ddsPubThread.hpp:435
rtctk::telRepub::PubThreadBase::skipedSamples_m
unsigned long skipedSamples_m
Definition: ddsPubThread.hpp:148
rtctk::telRepub
Definition: ddsPubThread.hpp:27
rtctk::telRepub::PubThread::PrintAffinity
void PrintAffinity()
Print to the log the set CPU thread affinity.
Definition: ddsPubThread.hpp:392
rtctk::telRepub::PubThread::m_message
T m_message
Definition: ddsPubThread.hpp:433
rtctk::telRepub::PubThread::m_data_writer
TDW * m_data_writer
Definition: ddsPubThread.hpp:437
rtctk::telRepub::PubThreadBase::topic_name_m
std::string topic_name_m
Definition: ddsPubThread.hpp:150
repubBuf.hpp
Buffer that keeps available free samples, and samples that are filled.
rtctk::componentFramework::GetLogger
log4cplus::Logger & GetLogger(const std::string &name="")
Get handle to a specific logger (used with logging macros)
rtctk::telRepub::PubThreadBase::rcvSamples_m
unsigned long rcvSamples_m
Definition: ddsPubThread.hpp:147
rtctk::telRepub::RepubBuf< rtctk::AgnosticTopic >
rtctk::telRepub::PubThread::~PubThread
~PubThread()
Definition: ddsPubThread.hpp:170
rtctk::telRepub::PubThread::m_framecounter
unsigned long m_framecounter
Definition: ddsPubThread.hpp:429
rtctk::telRepub::PubThreadBase::push_ret
bool push_ret
Definition: ddsPubThread.hpp:141
rtctk::telRepub::PubThreadBase::getTopicName
std::string getTopicName()
Definition: ddsPubThread.hpp:123
rtctk::telRepub::topicT::data
std::array< unsigned char, MAX_TOPIC_SIZE > data
Definition: ddsPubThread.hpp:39
rtctk::telRepub::PubThread::JoinThread
void JoinThread()
Join thread.
Definition: ddsPubThread.hpp:358
rtctk::telRepub::topicT
structure where aggregation of the MUDPI topic payload is done, and put to the queue for DDS publishi...
Definition: ddsPubThread.hpp:38
rtctk::telRepub::PubThreadBase::~PubThreadBase
virtual ~PubThreadBase()
Definition: ddsPubThread.hpp:76
DDS_RETCODE_TIMEOUT
#define DDS_RETCODE_TIMEOUT
Definition: ddsCommon.hpp:83
DDS_HANDLE_NIL
#define DDS_HANDLE_NIL
Definition: ddsCommon.hpp:82
ddsCommon.hpp
Declares common DDS types.
rtctk::telRepub::PubThread::CreateSimThread
void CreateSimThread(u_int16_t f)
Create thread for simulation.
Definition: ddsPubThread.hpp:338
rtctk::telRepub::PubThread::m_loop
std::atomic< bool > m_loop
Definition: ddsPubThread.hpp:428
rtctk::telRepub::topicT::size
std::size_t size
Definition: ddsPubThread.hpp:40
mudpi::uint32_t
unsigned int uint32_t
Definition: mudpi.h:16
rtctk::telRepub::topicT::sampleId
uint32_t sampleId
Definition: ddsPubThread.hpp:41
logger.hpp
Logging Support Library based on log4cplus.
rtctk::telRepub::MAX_TOPIC_SIZE
uint32_t const MAX_TOPIC_SIZE
Definition: ddsPubThread.hpp:32
rtctk::telRepub::PubThread::SetAffinity
void SetAffinity(unsigned int cpu)
set affinity of the thread.
Definition: ddsPubThread.hpp:371
rtctk::telRepub::PubThread::ReadQueuePub
void ReadQueuePub()
Thread worker that reads (pops) agnostic topic from queue and publishes it via DDS.
Definition: ddsPubThread.hpp:178
rtctk::telRepub::PubThread::m_thread
boost::thread m_thread
Definition: ddsPubThread.hpp:427
rtctk::telRepub::PubThread::CreateThread
void CreateThread()
Create thread for publishing data from the queue.
Definition: ddsPubThread.hpp:349
rtctk::telRepub::PubThreadBase
DDS publisher thread class that provides queue with aggregated topic to be published by DDS.
Definition: ddsPubThread.hpp:50
rtctk::telRepub::PubThreadBase::m_repub_buf
RepubBuf< rtctk::AgnosticTopic > * m_repub_buf
Definition: ddsPubThread.hpp:145
rtctk::telRepub::PubThread::PubThread
PubThread(TDW *dw, std::uint32_t queue_size=300)
Definition: ddsPubThread.hpp:163
rtctk::telRepub::PubThreadBase::GetRepubBuf
RepubBuf< rtctk::AgnosticTopic > * GetRepubBuf()
Get internal queue/buffer.
Definition: ddsPubThread.hpp:131
rtctk::telRepub::PubThread::m_sim_loop_sleep
useconds_t m_sim_loop_sleep
Definition: ddsPubThread.hpp:439