11 #ifndef RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP_
12 #define RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP_
15 #include <boost/lockfree/spsc_queue.hpp>
24 #include <numapp/thread.hpp>
35 template <u
int16_t Q_SIZE = 300>
41 topic_name_m = topic_name;
44 lastSkipedSample_m = 0;
51 topic_name_m = topic_name;
54 lastSkipedSample_m = 0;
57 if (skipedSamples_m > 0) {
58 LOG4CPLUS_INFO_FMT(m_logger,
59 "[%s] skipped %lu samples out of %lu ratio: %f. Last @: %u\n",
63 static_cast<double>(skipedSamples_m) / rcvSamples_m,
66 LOG4CPLUS_INFO_FMT(m_logger,
67 "[%s] Received: %lu samples. No samples skipped\n",
93 boost::condition_variable
cond;
103 template <
typename T = rtctk::componentFramework::AgnosticTopic,
105 typename TDW = DataWriter>
111 :
PubThreadBase<Q_SIZE>(dw->get_topic()->get_name(), queue_size)
115 , m_metrics(metrics) {
116 LOG4CPLUS_DEBUG_FMT(this->m_logger,
"%s", __PRETTY_FUNCTION__);
119 std::string id_prefix = this->topic_name_m;
121 std::transform(id_prefix.begin(), id_prefix.end(), id_prefix.begin(), ::tolower);
123 m_pc_sent_samples_reg = m_metrics.AddCounter(
126 "DDS samples sent for topic: " + this->topic_name_m));
131 LOG4CPLUS_DEBUG_FMT(this->m_logger,
"%s", __PRETTY_FUNCTION__);
139 ReturnCode_t retcode = ReturnCode_t::RETCODE_OK;
140 std::chrono::high_resolution_clock::time_point start_time;
141 boost::unique_lock<boost::mutex> lock(this->mut);
144 std::string id_suffix = m_thread_name;
146 std::transform(id_suffix.begin(), id_suffix.end(), id_suffix.begin(), ::tolower);
147 m_metrics.AddThread(numapp::thisThread::GetThreadId(),
149 "[Dpub" + this->topic_name_m +
150 "] Telemetry republisher (DDS) publisher thread"));
153 m_msg = this->m_repub_buf->WaitNextAvail();
154 if (m_msg ==
nullptr) {
157 m_sample_id = m_msg->sample_id();
158 if (m_sample_id != m_last_sample_id + 1) {
159 std::cout <<
"[" << this->topic_name_m
160 <<
"] sampleId mismatch last published: " << m_last_sample_id
161 <<
" current to be published: " << m_sample_id << std::endl;
164 m_last_sample_id = m_sample_id;
166 retcode = m_data_writer->write(m_msg, eprosima::fastrtps::rtps::c_InstanceHandle_Unknown);
167 this->m_repub_buf->MakeFree();
172 if (retcode != ReturnCode_t::RETCODE_OK) {
173 if (retcode == ReturnCode_t::RETCODE_TIMEOUT) {
174 LOG4CPLUS_ERROR_FMT(this->m_logger,
175 "[%s]SampleId: %u. DDS write timeout!",
176 this->topic_name_m.c_str(),
179 LOG4CPLUS_ERROR_FMT(this->m_logger,
180 "[%s]SampleId: %u. DDS failed to write. Return code: %s",
181 this->topic_name_m.c_str(),
189 LOG4CPLUS_DEBUG_FMT(this->m_logger,
"[%s]Flushing queue ...\n", this->topic_name_m.c_str());
191 while (this->m_repub_buf->GetAvail() > 0) {
192 m_msg = this->m_repub_buf->WaitNextAvail();
193 if (m_msg ==
nullptr) {
196 m_sample_id = m_msg->sample_id();
197 retcode = m_data_writer->write(m_msg, eprosima::fastrtps::rtps::c_InstanceHandle_Unknown);
198 this->m_repub_buf->MakeFree();
202 if ((m_framecounter % 1) == 0) {
203 LOG4CPLUS_DEBUG_FMT(this->m_logger,
204 "[%s] Flushing SampleId: %u. Droped: %lu (%f). Last @ %u\n",
205 this->topic_name_m.c_str(),
207 this->skipedSamples_m,
208 static_cast<double>(this->skipedSamples_m) / this->rcvSamples_m,
209 this->lastSkipedSample_m);
213 m_metrics.RemoveThread(numapp::thisThread::GetThreadId());
221 ReturnCode_t retcode = ReturnCode_t::RETCODE_OK;
222 unsigned int sampleId = 0;
224 std::string id_suffix = m_thread_name;
226 std::transform(id_suffix.begin(), id_suffix.end(), id_suffix.begin(), ::tolower);
228 numapp::thisThread::GetThreadId(),
230 "[Spub" + this->topic_name_m +
231 "] Telemetry republisher (DDS) publisher thread - SIMULATION"));
234 m_message.sample_id(sampleId);
236 retcode = m_data_writer->write(&m_message, eprosima::fastrtps::rtps::c_InstanceHandle_Unknown);
240 if (retcode != ReturnCode_t::RETCODE_OK) {
241 if (retcode == ReturnCode_t::RETCODE_TIMEOUT) {
242 LOG4CPLUS_ERROR_FMT(this->m_logger,
243 "[%s] Timeout while sending (write): %lu sample.\n",
244 this->topic_name_m.c_str(),
249 "[%s] Failed to send (write): %lu sample. Return Code: %s\n",
250 this->topic_name_m.c_str(),
257 usleep(m_sim_loop_sleep);
259 m_metrics.RemoveThread(numapp::thisThread::GetThreadId());
269 m_sim_loop_sleep = 1000000 / f;
270 m_thread_name =
"Spub" + this->topic_name_m;
271 if (m_thread_name.size() > 15) {
272 m_thread_name.erase(15);
274 m_thread = numapp::MakeThread(
284 m_thread_name =
"Dpub" + this->topic_name_m;
285 if (m_thread_name.size() > 15) {
286 m_thread_name.erase(15);
288 m_thread = numapp::MakeThread(
297 this->m_repub_buf->ReleaseWait();
298 this->cond.notify_one();