12 #ifndef RTCTK_DATATASK_READERTHREAD_HPP
13 #define RTCTK_DATATASK_READERTHREAD_HPP
24 #include <numapp/numapolicies.hpp>
25 #include <numapp/thread.hpp>
26 #include <ipcq/reader.hpp>
56 :
rtctk::componentFramework::
RtctkException(
"An asynchronous error occured: '" + text +
"'!") {}
75 template<
typename TopicType,
template<
typename>
typename ReaderType=ipcq::Reader>
81 , m_thread_name(
"thread_name",
"readerThread")
82 , m_queue_name(
"queue_name")
83 , m_thread_affinity(
"thread_affinity")
84 , m_samples_to_read(
"samples_to_read")
85 , m_samples_to_skip(
"samples_to_skip", 0)
86 , m_loop_frequency(
"loop_frequency")
87 , m_error_margin(
"error_margin")
89 , m_callback_on_data()
90 , m_callback_init_thread()
96 if(m_thread.joinable()) {
115 using namespace std::chrono_literals;
117 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Spawn()");
120 assert(m_callback_on_data);
123 m_queue_name.CheckSet();
124 m_thread_name.CheckSet();
125 m_samples_to_read.CheckSet();
126 m_samples_to_skip.CheckSet();
127 m_loop_frequency.CheckSet();
131 m_thread_name.Lock();
132 m_thread_affinity.Lock();
133 m_loop_frequency.Lock();
135 if(!m_error_margin.IsSet()) {
136 m_error_margin.Set(1.5);
138 m_error_margin.Lock();
140 auto policies = numapp::NumaPolicies();
141 if (m_thread_affinity.IsSet()) {
142 auto cpu_mask = numapp::Cpumask::MakeFromCpuStringAll(
143 std::to_string(m_thread_affinity.Get()).c_str());
144 policies.SetCpuAffinity(numapp::CpuAffinity(cpu_mask));
147 auto f = SendRequestAsync(Command::Idle);
149 m_thread = numapp::MakeThread(
156 auto status = f.wait_for(1200ms);
157 if (status != std::future_status::ready) {
174 using namespace std::chrono_literals;
176 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Join()");
178 auto f = SendRequestAsync(Command::Exit);
181 auto timeout = std::max(1200ms,
detail::CalcTimeout(1, m_loop_frequency.Get(), m_error_margin.Get()));
182 auto status = f.wait_for(timeout);
186 m_queue_name.Unlock();
187 m_thread_name.Unlock();
188 m_thread_affinity.Unlock();
189 m_loop_frequency.Unlock();
191 if (status != std::future_status::ready) {
202 using namespace std::chrono_literals;
204 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Run()");
207 SendRequestSync(Command::Run, 1200ms);
217 using namespace std::chrono_literals;
219 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Idle()");
222 auto timeout = std::max(1200ms,
detail::CalcTimeout(1, m_loop_frequency.Get(), m_error_margin.Get()));
223 SendRequestSync(Command::Idle, timeout);
234 using namespace std::chrono_literals;
236 std::error_code
const ok{};
238 m_loop_frequency.Get(), \
239 m_error_margin.Get());
241 auto ret = m_comp_allowed.Pend(timeout);
250 using namespace std::chrono_literals;
251 using namespace std::chrono;
253 m_loop_frequency.Get(), \
254 m_error_margin.Get());
256 auto time_start = system_clock::now();
258 std::this_thread::sleep_for(5ms);
260 if(st.StopRequested()) {
262 "ReaderThread::WaitUntilComputationAllowed() aborted via StopToken!");
266 auto ret = m_comp_allowed.TryPend();
268 std::error_code
const ok{};
269 if(ret.value() != ok) {
275 if(duration_cast<milliseconds>(system_clock::now() - time_start) > timeout) {
276 CII_THROW(
AsynchronousError,
"ReaderThread::WaitUntilComputationAllowed() timeout");
297 m_queue_name.Set(
name);
316 m_thread_name.Set(
name);
325 m_thread_affinity.Set(affinity);
334 m_samples_to_read.Set(value);
343 m_samples_to_skip.Set(value);
353 m_loop_frequency.Set(value);
363 m_error_margin.Set(value);
376 m_callback_on_data = callback;
388 m_callback_init_thread = std::move(callback);
393 enum class State : unsigned {
405 const std::map<State, std::string> m_state_text = {
406 {State::Error,
"Error"},
408 {State::Starting,
"Starting"},
409 {State::Terminating,
"Terminating"},
410 {State::Idle,
"Idle"},
411 {State::Reading,
"Reading"},
412 {State::Skipping,
"Skipping"},
413 {State::Waiting,
"Waiting"},
414 {State::Dropping,
"Dropping"},
417 enum class Command: unsigned {
424 const std::map<Command, std::string> m_command_text = {
425 {Command::None,
"-"},
426 {Command::Run,
"Run"},
427 {Command::Idle,
"Idle"},
428 {Command::Exit,
"Exit"},
435 std::future<void> SendRequestAsync(Command cmd) {
436 Request<Command> req(cmd);
437 auto f = req.GetReplyFuture();
438 m_request_q.Post(std::move(req));
448 template <
class Rep,
class Period>
449 void SendRequestSync(Command cmd, std::chrono::duration<Rep, Period> timeout) {
450 auto f = SendRequestAsync(cmd);
451 auto status = f.wait_for(timeout);
452 if (status != std::future_status::ready) {
453 CII_THROW(RequestTimedOut, m_command_text.at(cmd));
457 Request<Command> GetRequest() {
458 return m_request_q.TryPend().value_or(Request(Command::None));
469 using namespace std::chrono;
470 using namespace std::chrono_literals;
472 auto req = GetRequest();
474 m_state = State::Off;
475 State next_state = State::Starting;
476 State prev_state = State::Off;
478 auto time_start = system_clock::now();
479 milliseconds time_elapsed {0};
480 auto duration_skip =
CalcTimeout(m_samples_to_skip.Get(), m_loop_frequency.Get(), 1.0);
481 auto timeout_wait {duration_skip / 2};
482 auto timeout_drop {duration_skip / 2};
487 std::error_code
const ok{};
488 std::error_code ret = ok;
490 auto reader = ReaderType<TopicType>::MakeReader(m_queue_name.Get().c_str(), 30s);
494 prev_state = m_state;
495 if(m_state != next_state) {
496 m_state = next_state;
497 LOG4CPLUS_INFO(
GetLogger(),
"ReaderThread::Work() - changed state to '" << m_state_text.at(m_state) <<
"'");
502 case State::Starting:
505 if(m_callback_init_thread) {
506 m_callback_init_thread();
508 next_state = State::Idle;
511 next_state = State::Error;
518 if(m_state != prev_state) {
524 if(req.GetPayload() == Command::Exit) {
525 next_state = State::Terminating;
527 }
else if(req.GetPayload() == Command::Run) {
530 next_state = ret==ok ? State::Reading : State::Error;
534 std::this_thread::sleep_for(10ms);
535 next_state = m_state;
541 if(m_state != prev_state) {
542 if(prev_state == State::Idle) {
543 m_comp_allowed.Clear();
546 to_read = m_samples_to_read.Get();
547 to_skip = m_samples_to_skip.Get();
551 if(req.GetPayload() == Command::Exit) {
552 next_state = State::Terminating;
554 }
else if (req.GetPayload() == Command::Idle){
555 next_state = State::Idle;
560 size_t to_read_now = m_loop_frequency.Get();
562 if(to_read_now == 0 and to_read > 0) {
566 if(to_read_now > to_read) {
567 to_read_now = to_read;
571 m_callback_on_data, \
573 m_loop_frequency.Get(), \
574 m_error_margin.Get());
576 to_read -= to_read_now;
579 next_state = State::Error;
580 }
else if(to_read == 0) {
582 next_state = State::Skipping;
584 next_state = State::Idle;
586 m_comp_allowed.Post(ok);
589 next_state = m_state;
594 case State::Skipping:
597 if(req.GetPayload() == Command::Exit) {
598 next_state = State::Terminating;
600 }
else if (req.GetPayload() == Command::Idle){
601 next_state = State::Idle;
606 size_t to_skip_now = m_loop_frequency.Get();
608 if(to_skip_now == 0 and to_skip > 0) {
612 if(to_skip_now > to_skip) {
613 to_skip_now = to_skip;
616 ret =
Skip(reader, to_skip_now, m_loop_frequency.Get(), m_error_margin.Get());
618 to_skip -= to_skip_now;
621 next_state = State::Error;
622 }
else if(to_skip == 0) {
623 if(m_comp_done.TryWait()) {
624 next_state = State::Reading;
626 next_state = State::Waiting;
629 next_state = m_state;
636 if(m_state != prev_state) {
637 time_start = system_clock::now();
641 if(req.GetPayload() == Command::Exit) {
642 next_state = State::Terminating;
644 }
else if (req.GetPayload() == Command::Idle){
645 next_state = State::Idle;
649 if(m_comp_done.TryWait()) {
650 next_state = State::Reading;
654 std::this_thread::sleep_for(1ms);
656 time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
657 if((time_elapsed > timeout_wait) or (
NumFree(reader) == 0)) {
658 next_state = State::Dropping;
660 next_state = m_state;
665 case State::Dropping:
667 if(m_state != prev_state) {
668 time_start = system_clock::now();
672 if(req.GetPayload() == Command::Exit) {
673 next_state = State::Terminating;
675 }
else if (req.GetPayload() == Command::Idle){
676 next_state = State::Idle;
680 if(m_comp_done.TryWait()) {
682 next_state = ret==ok ? State::Reading : State::Error;
686 std::this_thread::sleep_for(1ms);
688 time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
689 if(time_elapsed > timeout_drop) {
691 next_state = State::Error;
693 next_state = m_state;
700 if(m_state != prev_state) {
701 m_comp_allowed.Post(ret);
702 LOG4CPLUS_ERROR(
GetLogger(),
"ReaderThread::Work() - " << ret.message());
706 if(req.GetPayload() == Command::Exit) {
707 next_state = State::Terminating;
708 }
else if(req.GetPayload() == Command::Idle) {
709 next_state = State::Idle;
711 std::this_thread::sleep_for(100ms);
712 next_state = m_state;
717 case State::Terminating:
719 next_state = State::Off;
732 std::thread m_thread;
734 Parameter<ReaderMode> m_mode;
735 Parameter<std::string> m_thread_name;
736 Parameter<std::string> m_queue_name;
737 Parameter<size_t> m_thread_affinity;
738 Parameter<size_t> m_samples_to_read;
739 Parameter<size_t> m_samples_to_skip;
740 Parameter<float> m_loop_frequency;
741 Parameter<float> m_error_margin;
743 MessageQueue<Request<Command>> m_request_q;
744 MessageQueue<std::error_code> m_comp_allowed;
746 Semaphore m_comp_done;
750 std::function<void(
const TopicType& sample)> m_callback_on_data;
751 std::function<void()> m_callback_init_thread;