12 #ifndef RTCTK_DATATASK_READERTHREAD_HPP
13 #define RTCTK_DATATASK_READERTHREAD_HPP
24 #include <numapp/numapolicies.hpp>
25 #include <numapp/thread.hpp>
26 #include <ipcq/reader.hpp>
56 :
rtctk::componentFramework::
RtctkException(
"An asynchronous error occured: '" + text +
"'!") {}
75 template<
typename TopicType,
template<
typename>
typename ReaderType=ipcq::Reader>
81 , m_thread_name(
"thread_name",
"readerThread")
82 , m_queue_name(
"queue_name")
83 , m_thread_affinity(
"thread_affinity")
84 , m_samples_to_read(
"samples_to_read")
85 , m_samples_to_skip(
"samples_to_skip", 0)
86 , m_loop_frequency(
"loop_frequency")
87 , m_error_margin(
"error_margin")
89 , m_callback_on_data()
90 , m_callback_init_thread()
96 if(m_thread.joinable()) {
115 using namespace std::chrono_literals;
117 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Spawn()");
120 assert(m_callback_on_data);
123 m_queue_name.CheckSet();
124 m_thread_name.CheckSet();
125 m_samples_to_read.CheckSet();
126 m_samples_to_skip.CheckSet();
127 m_loop_frequency.CheckSet();
131 m_thread_name.Lock();
132 m_thread_affinity.Lock();
133 m_loop_frequency.Lock();
135 if(!m_error_margin.IsSet())
137 m_error_margin.Set(1.2);
139 m_error_margin.Lock();
141 auto policies = numapp::NumaPolicies();
142 if (m_thread_affinity.IsSet()) {
143 auto cpu_mask = numapp::Cpumask::MakeFromCpuStringAll(
144 std::to_string(m_thread_affinity.Get()).c_str());
145 policies.SetCpuAffinity(numapp::CpuAffinity(cpu_mask));
148 auto f = SendRequestAsync(Command::Idle);
150 m_thread = numapp::MakeThread(
156 auto status = f.wait_for(1200ms);
157 if (status != std::future_status::ready) {
174 using namespace std::chrono_literals;
176 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Join()");
178 auto f = SendRequestAsync(Command::Exit);
180 auto status = f.wait_for(1200ms);
184 m_queue_name.Unlock();
185 m_thread_name.Unlock();
186 m_thread_affinity.Unlock();
187 m_loop_frequency.Unlock();
189 if (status != std::future_status::ready) {
200 using namespace std::chrono_literals;
202 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Run()");
204 SendRequestSync(Command::Run);
214 using namespace std::chrono_literals;
216 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Idle()");
218 SendRequestSync(Command::Idle);
229 using namespace std::chrono_literals;
231 std::error_code
const ok{};
233 m_loop_frequency.Get(), \
236 auto ret = m_comp_allowed.Pend(timeout);
245 using namespace std::chrono_literals;
246 using namespace std::chrono;
248 m_loop_frequency.Get(), \
251 auto time_start = system_clock::now();
253 std::this_thread::sleep_for(5ms);
255 if(st.StopRequested()) {
257 "ReaderThread::WaitUntilComputationAllowed() aborted via StopToken!");
261 auto ret = m_comp_allowed.TryPend();
263 std::error_code
const ok{};
264 if(ret.value() != ok) {
270 if(system_clock::now() - time_start > timeout) {
271 CII_THROW(
AsynchronousError,
"ReaderThread::WaitUntilComputationAllowed() timeout");
292 m_queue_name.Set(
name);
311 m_thread_name.Set(
name);
320 m_thread_affinity.Set(affinity);
329 m_samples_to_read.Set(value);
338 m_samples_to_skip.Set(value);
348 m_loop_frequency.Set(value);
358 m_error_margin.Set(value);
371 m_callback_on_data = callback;
383 m_callback_init_thread = std::move(callback);
388 enum class State : unsigned {
400 const std::map<State, std::string> m_state_text = {
401 {State::Error,
"Error"},
403 {State::Starting,
"Starting"},
404 {State::Terminating,
"Terminating"},
405 {State::Idle,
"Idle"},
406 {State::Reading,
"Reading"},
407 {State::Skipping,
"Skipping"},
408 {State::Waiting,
"Waiting"},
409 {State::Dropping,
"Dropping"},
412 enum class Command: unsigned {
419 const std::map<Command, std::string> m_command_text = {
420 {Command::None,
"-"},
421 {Command::Run,
"Off"},
422 {Command::Idle,
"Starting"},
423 {Command::Exit,
"Terminating"},
432 std::future<void> SendRequestAsync(Command cmd) {
434 auto f = req.GetReplyFuture();
435 m_request_q.Post(std::move(req));
445 void SendRequestSync(Command cmd) {
446 using namespace std::chrono_literals;
448 auto f = SendRequestAsync(cmd);
449 auto status = f.wait_for(1200ms);
450 if (status != std::future_status::ready) {
451 CII_THROW(RequestTimedOut, m_command_text.at(cmd));
456 return m_request_q.TryPend().value_or(
Request(Command::None));
467 using namespace std::chrono;
468 using namespace std::chrono_literals;
470 auto req = GetRequest();
472 m_state = State::Off;
473 State next_state = State::Starting;
474 State prev_state = State::Off;
476 auto time_start = system_clock::now();
477 milliseconds time_elapsed {0};
478 auto duration_skip =
CalcDuration(m_samples_to_skip.Get(), m_loop_frequency.Get());
479 auto timeout_wait {duration_skip / 2};
480 auto timeout_drop {duration_skip / 2};
485 std::error_code
const ok{};
486 std::error_code ret = ok;
488 auto reader = ReaderType<TopicType>::MakeReader(m_queue_name.Get().c_str(), 30s);
492 prev_state = m_state;
493 if(m_state != next_state) {
494 m_state = next_state;
495 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Work() - changed state to '" << m_state_text.at(m_state) <<
"'");
500 case State::Starting:
503 if(m_callback_init_thread) {
504 m_callback_init_thread();
506 next_state = State::Idle;
509 next_state = State::Error;
516 if(m_state != prev_state) {
522 if(req.GetPayload() == Command::Exit) {
523 next_state = State::Terminating;
525 }
else if(req.GetPayload() == Command::Run) {
528 next_state = ret==ok ? State::Reading : State::Error;
532 std::this_thread::sleep_for(10ms);
533 next_state = m_state;
539 if(m_state != prev_state) {
540 if(prev_state == State::Idle) {
543 to_read = m_samples_to_read.Get();
544 to_skip = m_samples_to_skip.Get();
548 if(req.GetPayload() == Command::Exit) {
549 next_state = State::Terminating;
551 }
else if (req.GetPayload() == Command::Idle){
552 next_state = State::Idle;
556 size_t to_read_now = std::min(to_read, m_loop_frequency.Get());
557 auto time_read_start = system_clock::now();
559 m_callback_on_data, \
561 m_loop_frequency.Get(), \
562 m_error_margin.Get());
563 auto time_read_end = system_clock::now();
564 CalculateEstimatedFrequency(to_read_now, time_read_end - time_read_start);
565 to_read -= to_read_now;
568 next_state = State::Error;
569 }
else if(to_read == 0) {
571 next_state = State::Skipping;
573 next_state = State::Idle;
575 m_comp_allowed.Post(ok);
578 next_state = m_state;
583 case State::Skipping:
586 if(req.GetPayload() == Command::Exit) {
587 next_state = State::Terminating;
589 }
else if (req.GetPayload() == Command::Idle){
590 next_state = State::Idle;
594 size_t to_skip_now = std::min(to_skip, m_loop_frequency.Get());
595 auto time_skip_start = system_clock::now();
596 ret =
Skip(reader, to_skip_now, m_loop_frequency.Get(), m_error_margin.Get());
597 auto time_skip_end = system_clock::now();
598 CalculateEstimatedFrequency(to_skip_now, time_skip_end - time_skip_start);
600 to_skip -= to_skip_now;
603 next_state = State::Error;
604 }
else if(to_skip == 0) {
605 if(m_comp_done.TryWait()) {
606 next_state = State::Reading;
608 next_state = State::Waiting;
611 next_state = m_state;
618 if(m_state != prev_state) {
619 time_start = system_clock::now();
623 if(req.GetPayload() == Command::Exit) {
624 next_state = State::Terminating;
626 }
else if (req.GetPayload() == Command::Idle){
627 next_state = State::Idle;
631 if(m_comp_done.TryWait()) {
632 next_state = State::Reading;
636 std::this_thread::sleep_for(1ms);
638 time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
639 if((time_elapsed > timeout_wait) or (
NumFree(reader) == 0)) {
640 next_state = State::Dropping;
642 next_state = m_state;
647 case State::Dropping:
649 if(m_state != prev_state) {
650 time_start = system_clock::now();
654 if(req.GetPayload() == Command::Exit) {
655 next_state = State::Terminating;
657 }
else if (req.GetPayload() == Command::Idle){
658 next_state = State::Idle;
662 if(m_comp_done.TryWait()) {
664 next_state = ret==ok ? State::Reading : State::Error;
668 std::this_thread::sleep_for(1ms);
670 time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
671 if(time_elapsed > timeout_drop) {
673 next_state = State::Error;
675 next_state = m_state;
682 if(m_state != prev_state) {
683 m_comp_allowed.Post(ret);
684 LOG4CPLUS_ERROR(
GetLogger(),
"ReaderThread::Work() - " << ret.message());
688 if(req.GetPayload() == Command::Exit) {
689 next_state = State::Terminating;
690 }
else if(req.GetPayload() == Command::Idle) {
691 next_state = State::Idle;
693 std::this_thread::sleep_for(100ms);
694 next_state = m_state;
699 case State::Terminating:
701 next_state = State::Off;
720 template <
class Rep,
class Period>
721 void CalculateEstimatedFrequency(
size_t count,
722 std::chrono::duration<Rep, Period> time)
725 using namespace std::chrono;
726 if(time.count() != 0)
728 m_estimated_loop_frequency =
static_cast<float>(count) / \
729 duration_cast<seconds>(time).count();
731 if(!std::isinf(m_estimated_loop_frequency))
734 if(m_estimated_loop_frequency < \
735 static_cast<float>(m_loop_frequency.Get())/m_error_margin.Get() || \
736 m_estimated_loop_frequency > \
737 static_cast<float>(m_loop_frequency.Get())*m_error_margin.Get())
739 LOG4CPLUS_WARN(
GetLogger(),
"ReaderThread: Measured SHM frequency: " <<\
740 m_estimated_loop_frequency <<
" expected: " << m_loop_frequency.Get());
746 std::thread m_thread;
748 Parameter<ReaderMode> m_mode;
749 Parameter<std::string> m_thread_name;
750 Parameter<std::string> m_queue_name;
751 Parameter<size_t> m_thread_affinity;
752 Parameter<size_t> m_samples_to_read;
753 Parameter<size_t> m_samples_to_skip;
754 Parameter<size_t> m_loop_frequency;
755 Parameter<float> m_error_margin;
757 float m_estimated_loop_frequency;
759 MessageQueue<Request<Command>> m_request_q;
760 MessageQueue<std::error_code> m_comp_allowed;
762 Semaphore m_comp_done;
766 std::function<void(
const TopicType& sample)> m_callback_on_data;
767 std::function<void()> m_callback_init_thread;