11 #ifndef RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP_
12 #define RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP_
14 #include <boost/thread/thread.hpp>
17 #include <boost/lockfree/spsc_queue.hpp>
39 std::array<unsigned char, MAX_TOPIC_SIZE>
data;
49 template <u
int16_t Q_SIZE = 300>
54 queue_m =
new boost::lockfree::spsc_queue<topicT, boost::lockfree::capacity<Q_SIZE> >;
58 topic_name_m = topic_name;
61 lastSkipedSample_m = 0;
67 queue_m =
new boost::lockfree::spsc_queue<topicT, boost::lockfree::capacity<Q_SIZE> >;
71 topic_name_m = topic_name;
74 lastSkipedSample_m = 0;
77 if (skipedSamples_m > 0) {
79 "[%s] skipped %lu samples out of %lu ratio: %f. Last @: %u\n",
83 static_cast<double>(skipedSamples_m) / rcvSamples_m,
87 "[%s] Received: %lu samples. No samples skipped\n",
103 inline virtual bool push(
topicT const &value) {
106 push_ret = queue_m->push(value);
107 this->cond.notify_one();
112 "[%s] SampleId: %u overrun, so far skipped %lu samples. Last @: %u\n",
113 topic_name_m.c_str(),
117 lastSkipedSample_m = value.
sampleId;
133 LOG4CPLUS_ERROR_FMT(
GetLogger(),
" Method Not implemented for RTI DDS (as it is not used)");
143 boost::lockfree::spsc_queue<topicT, boost::lockfree::capacity<Q_SIZE> > *queue_m;
152 boost::condition_variable
cond;
160 template <
typename T = rtctk::AgnosticTopic, u
int16_t Q_SIZE = 300,
typename TDW = TypedDataWriter>
164 LOG4CPLUS_DEBUG_FMT(
GetLogger(),
"%s", __PRETTY_FUNCTION__);
171 LOG4CPLUS_DEBUG_FMT(
GetLogger(),
"%s", __PRETTY_FUNCTION__);
180 std::chrono::high_resolution_clock::time_point start_time;
181 boost::unique_lock<boost::mutex> lock(this->mut);
188 this->cond.wait(lock);
189 while (this->queue_m->pop(value)) {
190 start_time = std::chrono::high_resolution_clock::now();
191 m_message.serialized_data.loan_contiguous(
193 m_sample_id = m_message.sample_id = value.
sampleId;
195 m_message.serialized_data.unloan();
198 m_msg = this->m_repub_buf->WaitNextAvail();
199 if (m_msg ==
nullptr) {
202 m_sample_id = m_msg->sample_id();
203 if (m_sample_id != m_last_sample_id + 1) {
204 std::cout <<
"[" << this->topic_name_m
205 <<
"] sample id mismatch last: " << m_last_sample_id
206 <<
" current: " << m_sample_id << std::endl;
209 m_last_sample_id = m_sample_id;
212 this->m_repub_buf->MakeFree();
220 "[%s]SampleId: %u. DDS write timeout!",
221 this->topic_name_m.c_str(),
226 "[%s]SampleId: %u. DDS failed to write. Return code: %s",
227 this->topic_name_m.c_str(),
233 if ((m_framecounter % 2000) == 0) {
234 if (this->skipedSamples_m > 0) {
237 "[%s]SampleId: %u. Dropped: %lu (%f). Last @: %u\n",
238 this->topic_name_m.c_str(),
240 this->skipedSamples_m,
241 static_cast<double>(this->skipedSamples_m) / this->rcvSamples_m,
242 this->lastSkipedSample_m);
245 "[%s]SampleId: %u. No Drops so far.\n",
246 this->topic_name_m.c_str(),
258 LOG4CPLUS_DEBUG_FMT(
GetLogger(),
"[%s]Flushing queue ...\n", this->topic_name_m.c_str());
261 while (this->queue_m->pop(value)) {
262 m_message.serialized_data.loan_contiguous(
264 m_sample_id = m_message.sample_id = value.
sampleId;
266 m_message.serialized_data.unloan();
268 while (this->m_repub_buf->GetAvail() > 0) {
269 m_msg = this->m_repub_buf->WaitNextAvail();
270 if (m_msg ==
nullptr) {
273 m_sample_id = m_msg->sample_id();
275 this->m_repub_buf->MakeFree();
279 if ((m_framecounter % 1) == 0) {
281 "[%s] Flushing SampleId: %u. Droped: %lu (%f). Last @ %u\n",
282 this->topic_name_m.c_str(),
284 this->skipedSamples_m,
285 static_cast<double>(this->skipedSamples_m) / this->rcvSamples_m,
286 this->lastSkipedSample_m);
299 unsigned int sampleId = 0;
302 m_message.sample_id = sampleId;
306 m_message.sample_id(sampleId);
315 "[%s] Timeout while sending (write): %lu sample.\n",
316 this->topic_name_m.c_str(),
321 "[%s] Failed to send (write): %lu sample. Return Code: %s\n",
322 this->topic_name_m.c_str(),
329 usleep(m_sim_loop_sleep);
339 m_sim_loop_sleep = 1000000 / f;
341 std::string thr_name =
"Spub" + this->topic_name_m;
342 pthread_setname_np(this->m_thread.native_handle(), thr_name.c_str());
351 std::string thr_name =
"Dpub" + this->topic_name_m;
352 pthread_setname_np(this->m_thread.native_handle(), thr_name.c_str());
361 this->m_repub_buf->ReleaseWait();
363 this->cond.notify_one();
375 CPU_SET(cpu, &cpuset);
376 ret = pthread_setaffinity_np(m_thread.native_handle(),
sizeof(cpuset), &cpuset);
379 pthread_getname_np(m_thread.native_handle(), thr_name, 64);
381 "%s ERROR set thread affinity for: %s. Error: %s(%d)",
396 pthread_getaffinity_np(m_thread.native_handle(),
sizeof(cpuset), &cpuset);
399 for (
int j = 0; j < CPU_SETSIZE; j++) {
400 if (CPU_ISSET(j, &cpuset)) {
401 af += std::to_string(j) +
" ";
405 pthread_getname_np(m_thread.native_handle(), thr_name, 64);
406 LOG4CPLUS_INFO_FMT(
GetLogger(),
" CPU affinity for thread: %s: %s", thr_name, af.c_str());
413 void WaitForAcknowledgments() {
414 Duration_t to = {10, 0};
415 ReturnCode_t ret = m_data_writer->wait_for_acknowledgments(to);
418 "[%s] Failed wait_for_acknowledgments. Return code: %s\n",
419 this->topic_name_m.c_str(),