RTC Toolkit  2.0.0
readerThread.hpp
Go to the documentation of this file.
1 
12 #ifndef RTCTK_DATATASK_READERTHREAD_HPP
13 #define RTCTK_DATATASK_READERTHREAD_HPP
14 
23 
24 #include <numapp/numapolicies.hpp>
25 #include <numapp/thread.hpp>
26 #include <ipcq/reader.hpp>
27 
28 #include <thread>
29 #include <chrono>
30 #include <ctime>
31 #include <cmath>
32 #include <map>
33 #include <functional>
34 #include <stdexcept>
35 
36 
37 namespace rtctk::dataTask {
41 // TODO: this one is more high level, different file
43 {
44  public:
45  RequestTimedOut(const std::string& req_name)
46  : rtctk::componentFramework::RtctkException("Request '" + req_name + "' timed out!") {}
47 };
48 
50 {
51  public:
53  : rtctk::componentFramework::RtctkException("An asynchronous error occured!") {}
54 
55  AsynchronousError(const std::string& text)
56  : rtctk::componentFramework::RtctkException("An asynchronous error occured: '" + text + "'!") {}
57 };
58 
59 enum class ReaderMode : unsigned {
60  Single,
62 };
63 
75 template<typename TopicType, template<typename> typename ReaderType=ipcq::Reader>
76 class ReaderThread {
77  public:
78 
80  : m_mode("reader_mode", ReaderMode::Continuous)
81  , m_thread_name("thread_name", "readerThread")
82  , m_queue_name("queue_name")
83  , m_thread_affinity("thread_affinity")
84  , m_samples_to_read("samples_to_read")
85  , m_samples_to_skip("samples_to_skip", 0)
86  , m_loop_frequency("loop_frequency")
87  , m_error_margin("error_margin")
88  , m_state(State::Off)
89  , m_callback_on_data()
90  , m_callback_init_thread()
91  {
92  }
93 
95  {
96  if(m_thread.joinable()) {
97  Join();
98  }
99  }
100 
112  void Spawn()
113  {
115  using namespace std::chrono_literals;
116 
117  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Spawn()");
118 
119  // check if callback to OnDataAvailable has been registered
120  assert(m_callback_on_data);
121 
122  m_mode.CheckSet();
123  m_queue_name.CheckSet();
124  m_thread_name.CheckSet();
125  m_samples_to_read.CheckSet();
126  m_samples_to_skip.CheckSet();
127  m_loop_frequency.CheckSet();
128 
129  m_mode.Lock();
130  m_queue_name.Lock();
131  m_thread_name.Lock();
132  m_thread_affinity.Lock();
133  m_loop_frequency.Lock();
134 
135  if(!m_error_margin.IsSet()) {
136  m_error_margin.Set(1.5);
137  }
138  m_error_margin.Lock();
139 
140  auto policies = numapp::NumaPolicies();
141  if (m_thread_affinity.IsSet()) {
142  auto cpu_mask = numapp::Cpumask::MakeFromCpuStringAll(
143  std::to_string(m_thread_affinity.Get()).c_str());
144  policies.SetCpuAffinity(numapp::CpuAffinity(cpu_mask));
145  }
146 
147  auto f = SendRequestAsync(Command::Idle);
148 
149  m_thread = numapp::MakeThread(
150  m_thread_name.Get(),
151  policies,
152  &ReaderThread::Work,
153  this);
154 
155  // SM is starting quickly, so hard-coded timeout will do
156  auto status = f.wait_for(1200ms);
157  if (status != std::future_status::ready) {
158  CII_THROW(RequestTimedOut, m_command_text.at(Command::Idle));
159  }
160  }
161 
171  void Join()
172  {
174  using namespace std::chrono_literals;
175 
176  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Join()");
177 
178  auto f = SendRequestAsync(Command::Exit);
179 
180  // here timeout depends on loop_frequency and sample_to_read/skip
181  auto timeout = std::max(1200ms, detail::CalcTimeout(1, m_loop_frequency.Get(), m_error_margin.Get()));
182  auto status = f.wait_for(timeout);
183  m_thread.join();
184 
185  m_mode.Unlock();
186  m_queue_name.Unlock();
187  m_thread_name.Unlock();
188  m_thread_affinity.Unlock();
189  m_loop_frequency.Unlock();
190 
191  if (status != std::future_status::ready) {
192  CII_THROW(RequestTimedOut, m_command_text.at(Command::Exit));
193  }
194  }
195 
199  void Run()
200  {
202  using namespace std::chrono_literals;
203 
204  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Run()");
205 
206  // SM is going to state running quickly, so hard-coded timeout will do
207  SendRequestSync(Command::Run, 1200ms);
208  }
209 
210 
214  void Idle()
215  {
217  using namespace std::chrono_literals;
218 
219  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Idle()");
220 
221  // here timeout depends on loop_frequency and sample_to_read/skip
222  auto timeout = std::max(1200ms, detail::CalcTimeout(1, m_loop_frequency.Get(), m_error_margin.Get()));
223  SendRequestSync(Command::Idle, timeout);
224  }
225 
233  {
234  using namespace std::chrono_literals;
236  std::error_code const ok{};
237  auto timeout = detail::CalcTimeout(m_samples_to_read.Get() + m_samples_to_skip.Get(), \
238  m_loop_frequency.Get(), \
239  m_error_margin.Get());
240 
241  auto ret = m_comp_allowed.Pend(timeout);
242  if(ret != ok) {
243  CII_THROW(AsynchronousError,ret.message());
244  }
245  }
246 
248  {
250  using namespace std::chrono_literals;
251  using namespace std::chrono;
252  auto timeout = detail::CalcTimeout(m_samples_to_read.Get() + m_samples_to_skip.Get(), \
253  m_loop_frequency.Get(), \
254  m_error_margin.Get());
255 
256  auto time_start = system_clock::now();
257  while(true) {
258  std::this_thread::sleep_for(5ms);
259 
260  if(st.StopRequested()) {
261  LOG4CPLUS_INFO(GetLogger(),
262  "ReaderThread::WaitUntilComputationAllowed() aborted via StopToken!");
263  break;
264  }
265 
266  auto ret = m_comp_allowed.TryPend();
267  if(ret) {
268  std::error_code const ok{};
269  if(ret.value() != ok) {
270  CII_THROW(AsynchronousError, ret.value().message());
271  }
272  break;
273  }
274 
275  if(duration_cast<milliseconds>(system_clock::now() - time_start) > timeout) {
276  CII_THROW(AsynchronousError, "ReaderThread::WaitUntilComputationAllowed() timeout");
277  }
278  }
279  }
280 
286  {
287  m_comp_done.Post();
288  }
289 
294  void SetQueueName(std::string const& name)
295  {
296  // TODO is there a max lenght of the name?
297  m_queue_name.Set(name);
298  }
299 
304  void SetMode(ReaderMode mode)
305  {
306  m_mode.Set(mode);
307  }
308 
313  void SetThreadName(std::string const& name)
314  {
315  // TODO check for max 16 characters length
316  m_thread_name.Set(name);
317  }
318 
323  void SetCpuAffinity(int affinity)
324  {
325  m_thread_affinity.Set(affinity);
326  }
327 
332  void SetSamplesToRead(size_t value)
333  {
334  m_samples_to_read.Set(value);
335  }
336 
341  void SetSamplesToSkip(size_t value)
342  {
343  m_samples_to_skip.Set(value);
344  }
345 
351  void SetLoopFrequency(float value)
352  {
353  m_loop_frequency.Set(value);
354  }
355 
361  void SetErrorMargin(float value)
362  {
363  m_error_margin.Set(value);
364  }
365 
366  // call back registration
367  // TODO: add checks
374  void RegisterOnDataCallback(std::function<void(const TopicType& sample)> callback)
375  {
376  m_callback_on_data = callback;
377  }
378 
386  void RegisterInitThreadCallback(std::function<void()> callback)
387  {
388  m_callback_init_thread = std::move(callback);
389  }
390 
391  private:
392 
393  enum class State : unsigned {
394  Error,
395  Off,
396  Starting,
397  Terminating,
398  Idle,
399  Reading,
400  Skipping,
401  Waiting,
402  Dropping,
403  }; //< known states of the readerThread
404 
405  const std::map<State, std::string> m_state_text = {
406  {State::Error, "Error"},
407  {State::Off, "Off"},
408  {State::Starting, "Starting"},
409  {State::Terminating, "Terminating"},
410  {State::Idle, "Idle"},
411  {State::Reading, "Reading"},
412  {State::Skipping, "Skipping"},
413  {State::Waiting, "Waiting"},
414  {State::Dropping, "Dropping"},
415  }; //< States names
416 
417  enum class Command: unsigned {
418  None,
419  Run,
420  Idle,
421  Exit,
422  }; //< expected commands
423 
424  const std::map<Command, std::string> m_command_text = {
425  {Command::None, "-"},
426  {Command::Run, "Run"},
427  {Command::Idle, "Idle"},
428  {Command::Exit, "Exit"},
429  }; //< expected commands names
430 
435  std::future<void> SendRequestAsync(Command cmd) {
436  Request<Command> req(cmd);
437  auto f = req.GetReplyFuture();
438  m_request_q.Post(std::move(req));
439  return f;
440  }
441 
448  template <class Rep, class Period>
449  void SendRequestSync(Command cmd, std::chrono::duration<Rep, Period> timeout) {
450  auto f = SendRequestAsync(cmd);
451  auto status = f.wait_for(timeout);
452  if (status != std::future_status::ready) {
453  CII_THROW(RequestTimedOut, m_command_text.at(cmd));
454  }
455  }
456 
457  Request<Command> GetRequest() {
458  return m_request_q.TryPend().value_or(Request(Command::None));
459  }
460 
465  void Work()
466  {
468  using namespace rtctk::dataTask::detail;
469  using namespace std::chrono;
470  using namespace std::chrono_literals;
471 
472  auto req = GetRequest();
473 
474  m_state = State::Off;
475  State next_state = State::Starting;
476  State prev_state = State::Off;
477 
478  auto time_start = system_clock::now();
479  milliseconds time_elapsed {0};
480  auto duration_skip = CalcTimeout(m_samples_to_skip.Get(), m_loop_frequency.Get(), 1.0);
481  auto timeout_wait {duration_skip / 2};
482  auto timeout_drop {duration_skip / 2};
483 
484  size_t to_read = 0;
485  size_t to_skip = 0;
486 
487  std::error_code const ok{};
488  std::error_code ret = ok;
489 
490  auto reader = ReaderType<TopicType>::MakeReader(m_queue_name.Get().c_str(), 30s);
491 
492  while(1) {
493 
494  prev_state = m_state;
495  if(m_state != next_state) {
496  m_state = next_state;
497  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Work() - changed state to '" << m_state_text.at(m_state) << "'");
498  }
499 
500  switch(m_state)
501  {
502  case State::Starting:
503  {
504  try {
505  if(m_callback_init_thread) {
506  m_callback_init_thread();
507  }
508  next_state = State::Idle;
509  }catch(...) {
510  ret = std::make_error_code(std::errc::timed_out); // TODO: other code
511  next_state = State::Error;
512  }
513  break;
514  }
515 
516  case State::Idle:
517  {
518  if(m_state != prev_state) {
519  req.SetReply();
520  }
521 
522  req = GetRequest();
523 
524  if(req.GetPayload() == Command::Exit) {
525  next_state = State::Terminating;
526  break;
527  }else if(req.GetPayload() == Command::Run) {
528  m_comp_done.Clear();
529  ret = Reset(reader);
530  next_state = ret==ok ? State::Reading : State::Error;
531  break;
532  }
533 
534  std::this_thread::sleep_for(10ms);
535  next_state = m_state;
536  break;
537  }
538 
539  case State::Reading:
540  {
541  if(m_state != prev_state) {
542  if(prev_state == State::Idle) {
543  m_comp_allowed.Clear();
544  req.SetReply();
545  }
546  to_read = m_samples_to_read.Get();
547  to_skip = m_samples_to_skip.Get();
548  }
549 
550  req = GetRequest();
551  if(req.GetPayload() == Command::Exit) {
552  next_state = State::Terminating;
553  break;
554  }else if (req.GetPayload() == Command::Idle){
555  next_state = State::Idle;
556  break;
557  }
558 
559  // how many can we read in 1s?
560  size_t to_read_now = m_loop_frequency.Get();
561  // if less than 1 then read at least 1
562  if(to_read_now == 0 and to_read > 0) {
563  to_read_now = 1;
564  }
565  // but never read more than to_read
566  if(to_read_now > to_read) {
567  to_read_now = to_read;
568  }
569 
570  ret = Read(reader, \
571  m_callback_on_data, \
572  to_read_now, \
573  m_loop_frequency.Get(), \
574  m_error_margin.Get());
575 
576  to_read -= to_read_now;
577 
578  if(ret != ok) {
579  next_state = State::Error;
580  }else if(to_read == 0) {
581  if(m_mode.Get()==ReaderMode::Continuous) {
582  next_state = State::Skipping;
583  }else {
584  next_state = State::Idle;
585  }
586  m_comp_allowed.Post(ok);
587 
588  }else{
589  next_state = m_state;
590  }
591  break;
592  }
593 
594  case State::Skipping:
595  {
596  req = GetRequest();
597  if(req.GetPayload() == Command::Exit) {
598  next_state = State::Terminating;
599  break;
600  }else if (req.GetPayload() == Command::Idle){
601  next_state = State::Idle;
602  break;
603  }
604 
605  // how many can we skip in 1s?
606  size_t to_skip_now = m_loop_frequency.Get();
607  // if less than 1 then read at least 1
608  if(to_skip_now == 0 and to_skip > 0) {
609  to_skip_now = 1;
610  }
611  // but never skip more than to_skip
612  if(to_skip_now > to_skip) {
613  to_skip_now = to_skip;
614  }
615 
616  ret = Skip(reader, to_skip_now, m_loop_frequency.Get(), m_error_margin.Get());
617 
618  to_skip -= to_skip_now;
619 
620  if(ret != ok) {
621  next_state = State::Error;
622  }else if(to_skip == 0) {
623  if(m_comp_done.TryWait()) {
624  next_state = State::Reading;
625  }else {
626  next_state = State::Waiting;
627  }
628  }else{
629  next_state = m_state;
630  }
631  break;
632  }
633 
634  case State::Waiting:
635  {
636  if(m_state != prev_state) {
637  time_start = system_clock::now();
638  }
639 
640  req = GetRequest();
641  if(req.GetPayload() == Command::Exit) {
642  next_state = State::Terminating;
643  break;
644  }else if (req.GetPayload() == Command::Idle){
645  next_state = State::Idle;
646  break;
647  }
648 
649  if(m_comp_done.TryWait()) {
650  next_state = State::Reading;
651  break;
652  }
653 
654  std::this_thread::sleep_for(1ms);
655 
656  time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
657  if((time_elapsed > timeout_wait) or (NumFree(reader) == 0)) { // TODO: revisit
658  next_state = State::Dropping;
659  }else {
660  next_state = m_state;
661  }
662  break;
663  }
664 
665  case State::Dropping:
666  {
667  if(m_state != prev_state) {
668  time_start = system_clock::now();
669  }
670 
671  req = GetRequest();
672  if(req.GetPayload() == Command::Exit) {
673  next_state = State::Terminating;
674  break;
675  }else if (req.GetPayload() == Command::Idle){
676  next_state = State::Idle;
677  break;
678  }
679 
680  if(m_comp_done.TryWait()) {
681  ret = Reset(reader);
682  next_state = ret==ok ? State::Reading : State::Error;
683  break;
684  }
685 
686  std::this_thread::sleep_for(1ms);
687 
688  time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
689  if(time_elapsed > timeout_drop) {
690  ret = std::make_error_code(std::errc::timed_out);
691  next_state = State::Error;
692  }else {
693  next_state = m_state;
694  }
695  break;
696  }
697 
698  case State::Error:
699  {
700  if(m_state != prev_state) {
701  m_comp_allowed.Post(ret);
702  LOG4CPLUS_ERROR(GetLogger(), "ReaderThread::Work() - " << ret.message());
703  }
704 
705  req = GetRequest();
706  if(req.GetPayload() == Command::Exit) {
707  next_state = State::Terminating;
708  }else if(req.GetPayload() == Command::Idle) {
709  next_state = State::Idle;
710  }else{
711  std::this_thread::sleep_for(100ms);
712  next_state = m_state;
713  }
714  break;
715  }
716 
717  case State::Terminating:
718  {
719  next_state = State::Off;
720  break;
721  }
722 
723  case State::Off:
724  {
725  req.SetReply();
726  return;
727  }
728  }
729  }
730  }
731 
732  std::thread m_thread; //< the readerThead member
733 
734  Parameter<ReaderMode> m_mode; //< parameters mode
735  Parameter<std::string> m_thread_name; //< parameters thread name
736  Parameter<std::string> m_queue_name; //< parameter queue name
737  Parameter<size_t> m_thread_affinity; //< parameter cpu affinity
738  Parameter<size_t> m_samples_to_read; //< parameter samples to be read
739  Parameter<size_t> m_samples_to_skip; //< parameter samples to skip
740  Parameter<float> m_loop_frequency; //< parameter loop frequency
741  Parameter<float> m_error_margin; //< parameter timeout tolerance
742 
743  MessageQueue<Request<Command>> m_request_q;
744  MessageQueue<std::error_code> m_comp_allowed;
745 
746  Semaphore m_comp_done;
747 
748  State m_state;
749 
750  std::function<void(const TopicType& sample)> m_callback_on_data;
751  std::function<void()> m_callback_init_thread;
752 };
753 
754 } // namespace
755 
756 #endif
exceptions.hpp
Provides macros and utilities for exception handling.
wscript.name
name
Definition: wscript:15
rtctk::dataTask::ReaderMode
ReaderMode
Definition: readerThread.hpp:59
rtctk::dataTask::ReaderThread::SetCpuAffinity
void SetCpuAffinity(int affinity)
optional setter sets the cpu affinity name, if unlocked.
Definition: readerThread.hpp:323
rtctk::dataTask::ReaderThread::SetErrorMargin
void SetErrorMargin(float value)
optional setter sets the timeout tolerance.
Definition: readerThread.hpp:361
rtctk::dataTask::detail::Reset
std::error_code Reset(ReaderType &reader)
helper function to reset the ipcq.reader to latest sample
Definition: readerHelpers.hpp:157
rtctk::dataTask::ReaderThread::SetQueueName
void SetQueueName(std::string const &name)
required setter sets the queue name to be used if unlocked.
Definition: readerThread.hpp:294
rtctk::dataTask::RequestTimedOut
Definition: readerThread.hpp:43
rtctk_config_tool.None
None
Definition: rtctk_config_tool.py:172
rtctk::componentFramework::RtctkException::RtctkException
RtctkException() noexcept
Definition: exceptions.cpp:99
rtctk::dataTask::ReaderMode::Single
@ Single
Error
void Error(const char *msg)
Definition: main.cpp:38
messageQueue.hpp
A simple message queue implementation.
rtctk::dataTask::ReaderThread::RegisterInitThreadCallback
void RegisterInitThreadCallback(std::function< void()> callback)
optional callback
Definition: readerThread.hpp:386
rtctk::componentFramework::RtctkException
The RtctkException class is the base class for all Rtctk exceptions.
Definition: exceptions.hpp:207
readerHelpers.hpp
Helper methods to read data from shared memory queue.
rtctk::dataTask::ReaderThread::SetSamplesToRead
void SetSamplesToRead(size_t value)
required setter sets the number of samples to be read, if unlocked.
Definition: readerThread.hpp:332
rtctk::componentFramework::StopToken
rad::StopToken StopToken
Definition: stopToken.hpp:19
rtctk::dataTask::detail
Definition: readerHelpers.hpp:25
rtctk::dataTask::ReaderThread::SetLoopFrequency
void SetLoopFrequency(float value)
required setter sets the loop frequency, if unlocked.
Definition: readerThread.hpp:351
rtctk::dataTask::detail::CalcTimeout
std::chrono::milliseconds CalcTimeout(size_t count, float loop_frequency, float error_margin)
free helper function to calulate the estimated time to read the a number of samples at a given freque...
Definition: readerHelpers.hpp:37
semaphore.hpp
A simple semaphore implementation.
rtctk::dataTask::ReaderThread::WaitUntilComputationAllowed
void WaitUntilComputationAllowed(rtctk::componentFramework::StopToken st)
Definition: readerThread.hpp:247
rtctk::dataTask::RequestTimedOut::RequestTimedOut
RequestTimedOut(const std::string &req_name)
Definition: readerThread.hpp:45
rtctk::dataTask::ReaderThread::SetSamplesToSkip
void SetSamplesToSkip(size_t value)
required setter sets the number of samples to skip, if unlocked.
Definition: readerThread.hpp:341
rtctk::componentFramework::GetLogger
log4cplus::Logger & GetLogger(const std::string &name="")
Get handle to a specific logger (used with logging macros)
rtctk::dataTask::ReaderThread::SetThreadName
void SetThreadName(std::string const &name)
optional setter sets the thread name to be used, if unlocked.
Definition: readerThread.hpp:313
rtctk::dataTask::AsynchronousError::AsynchronousError
AsynchronousError(const std::string &text)
Definition: readerThread.hpp:55
rtctk::dataTask::detail::Read
std::error_code Read(ReaderType &reader, Operation &&op, size_t count, float loop_frequency, float error_margin)
helper function to wrap the ipcq.read with handling of timeouts and count values
Definition: readerHelpers.hpp:57
rtctk::dataTask::detail::NumFree
size_t NumFree(ReaderType &reader)
helper function to get the free space in the shm.
Definition: readerHelpers.hpp:174
rtctk::dataTask::ReaderThread::Spawn
void Spawn()
Spawn the reader thread.
Definition: readerThread.hpp:112
rtctk::dataTask::ReaderThread::Run
void Run()
send synchronous run request to readerThread.
Definition: readerThread.hpp:199
rtctk::dataTask::ReaderThread::~ReaderThread
~ReaderThread()
Definition: readerThread.hpp:94
rtctk::dataTask::ReaderThread
ReaderThread for the Data Class.
Definition: readerThread.hpp:76
rtctk::dataTask::ReaderThread::RegisterOnDataCallback
void RegisterOnDataCallback(std::function< void(const TopicType &sample)> callback)
required callback
Definition: readerThread.hpp:374
rtctk::dataTask
Definition: messageQueue.hpp:20
parameter.hpp
A Parameter class used for Data Tasks.
stopToken.hpp
A simple Stop Token.
rtctk::dataTask::detail::Skip
std::error_code Skip(ReaderType &reader, size_t count, float loop_frequency, float error_margin)
helper function to wrap the ipcq.skip with handling of timeouts and count values
Definition: readerHelpers.hpp:109
rtctk::telRepub::make_error_code
std::error_code make_error_code(MudpiProcessingError e)
Create std::error_code from ProcessingError.
Definition: mudpiProcessingError.hpp:113
logger.hpp
Logging Support Library based on log4cplus.
rtctk::dataTask::AsynchronousError
Definition: readerThread.hpp:50
rtctk::dataTask::ReaderThread::Idle
void Idle()
send Idle request to readerThread.
Definition: readerThread.hpp:214
rtctk::dataTask::ReaderThread::Join
void Join()
waits for thread to complete and rejoins
Definition: readerThread.hpp:171
rtctk::dataTask::ReaderThread::WaitUntilComputationAllowed
void WaitUntilComputationAllowed()
wait until the computation can be launched.
Definition: readerThread.hpp:232
rtctk::dataTask::ReaderThread::SetMode
void SetMode(ReaderMode mode)
optional setter sets the mode of the reader.
Definition: readerThread.hpp:304
rtctk::dataTask::ReaderThread::SignalComputationDone
void SignalComputationDone()
returns from the business logic that the calculation has completed so that the readerThread is safe t...
Definition: readerThread.hpp:285
rtctk::dataTask::ReaderThread::ReaderThread
ReaderThread()
Definition: readerThread.hpp:79
rtctk
Definition: commandReplier.cpp:20
rtctk::dataTask::AsynchronousError::AsynchronousError
AsynchronousError()
Definition: readerThread.hpp:52
request.hpp
A Request class used in Data Tasks.