RTC Toolkit  1.0.0
readerThread.hpp
Go to the documentation of this file.
1 
12 #ifndef RTCTK_DATATASK_READERTHREAD_HPP
13 #define RTCTK_DATATASK_READERTHREAD_HPP
14 
23 
24 #include <numapp/numapolicies.hpp>
25 #include <numapp/thread.hpp>
26 #include <ipcq/reader.hpp>
27 
28 #include <thread>
29 #include <chrono>
30 #include <ctime>
31 #include <cmath>
32 #include <map>
33 #include <functional>
34 #include <stdexcept>
35 
36 
37 namespace rtctk::dataTask {
41 // TODO: this one is more high level, different file
43 {
44  public:
45  RequestTimedOut(const std::string& req_name)
46  : rtctk::componentFramework::RtctkException("Request '" + req_name + "' timed out!") {}
47 };
48 
50 {
51  public:
53  : rtctk::componentFramework::RtctkException("An asynchronous error occured!") {}
54 
55  AsynchronousError(const std::string& text)
56  : rtctk::componentFramework::RtctkException("An asynchronous error occured: '" + text + "'!") {}
57 };
58 
59 enum class ReaderMode : unsigned {
60  Single,
62 };
63 
75 template<typename TopicType, template<typename> typename ReaderType=ipcq::Reader>
76 class ReaderThread {
77  public:
78 
80  : m_mode("reader_mode", ReaderMode::Continuous)
81  , m_thread_name("thread_name", "readerThread")
82  , m_queue_name("queue_name")
83  , m_thread_affinity("thread_affinity")
84  , m_samples_to_read("samples_to_read")
85  , m_samples_to_skip("samples_to_skip", 0)
86  , m_loop_frequency("loop_frequency")
87  , m_error_margin("error_margin")
88  , m_state(State::Off)
89  , m_callback_on_data()
90  , m_callback_init_thread()
91  {
92  }
93 
95  {
96  if(m_thread.joinable()) {
97  Join();
98  }
99  }
100 
112  void Spawn()
113  {
115  using namespace std::chrono_literals;
116 
117  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Spawn()");
118 
119  // check if callback to OnDataAvailable has been registered
120  assert(m_callback_on_data);
121 
122  m_mode.CheckSet();
123  m_queue_name.CheckSet();
124  m_thread_name.CheckSet();
125  m_samples_to_read.CheckSet();
126  m_samples_to_skip.CheckSet();
127  m_loop_frequency.CheckSet();
128 
129  m_mode.Lock();
130  m_queue_name.Lock();
131  m_thread_name.Lock();
132  m_thread_affinity.Lock();
133  m_loop_frequency.Lock();
134 
135  if(!m_error_margin.IsSet())
136  {
137  m_error_margin.Set(1.2);
138  }
139  m_error_margin.Lock();
140 
141  auto policies = numapp::NumaPolicies();
142  if (m_thread_affinity.IsSet()) {
143  auto cpu_mask = numapp::Cpumask::MakeFromCpuStringAll(
144  std::to_string(m_thread_affinity.Get()).c_str());
145  policies.SetCpuAffinity(numapp::CpuAffinity(cpu_mask));
146  }
147 
148  auto f = SendRequestAsync(Command::Idle);
149 
150  m_thread = numapp::MakeThread(
151  m_thread_name.Get(),
152  policies,
153  &ReaderThread::Work,
154  this);
155 
156  auto status = f.wait_for(1200ms);
157  if (status != std::future_status::ready) {
158  CII_THROW(RequestTimedOut, m_command_text.at(Command::Idle));
159  }
160  }
161 
171  void Join()
172  {
174  using namespace std::chrono_literals;
175 
176  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Join()");
177 
178  auto f = SendRequestAsync(Command::Exit);
179 
180  auto status = f.wait_for(1200ms);
181  m_thread.join();
182 
183  m_mode.Unlock();
184  m_queue_name.Unlock();
185  m_thread_name.Unlock();
186  m_thread_affinity.Unlock();
187  m_loop_frequency.Unlock();
188 
189  if (status != std::future_status::ready) {
190  CII_THROW(RequestTimedOut, m_command_text.at(Command::Exit));
191  }
192  }
193 
197  void Run()
198  {
200  using namespace std::chrono_literals;
201 
202  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Run()");
203 
204  SendRequestSync(Command::Run);
205  }
206 
207 
211  void Idle()
212  {
214  using namespace std::chrono_literals;
215 
216  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Idle()");
217 
218  SendRequestSync(Command::Idle);
219  }
220 
228  {
229  using namespace std::chrono_literals;
231  std::error_code const ok{};
232  auto timeout = 1000*detail::CalcDuration(m_samples_to_read.Get(), \
233  m_loop_frequency.Get(), \
234  3.0);
235 
236  auto ret = m_comp_allowed.Pend(timeout);
237  if(ret != ok) {
238  CII_THROW(AsynchronousError,ret.message());
239  }
240  }
241 
243  {
245  using namespace std::chrono_literals;
246  using namespace std::chrono;
247  auto timeout = 1000*detail::CalcDuration(m_samples_to_read.Get(), \
248  m_loop_frequency.Get(), \
249  3.0);
250 
251  auto time_start = system_clock::now();
252  while(true) {
253  std::this_thread::sleep_for(5ms);
254 
255  if(st.StopRequested()) {
256  LOG4CPLUS_INFO(GetLogger(),
257  "ReaderThread::WaitUntilComputationAllowed() aborted via StopToken!");
258  break;
259  }
260 
261  auto ret = m_comp_allowed.TryPend();
262  if(ret) {
263  std::error_code const ok{};
264  if(ret.value() != ok) {
265  CII_THROW(AsynchronousError, ret.value().message());
266  }
267  break;
268  }
269 
270  if(system_clock::now() - time_start > timeout) {
271  CII_THROW(AsynchronousError, "ReaderThread::WaitUntilComputationAllowed() timeout");
272  }
273  }
274  }
275 
281  {
282  m_comp_done.Post();
283  }
284 
289  void SetQueueName(std::string const& name)
290  {
291  // TODO is there a max lenght of the name?
292  m_queue_name.Set(name);
293  }
294 
299  void SetMode(ReaderMode mode)
300  {
301  m_mode.Set(mode);
302  }
303 
308  void SetThreadName(std::string const& name)
309  {
310  // TODO check for max 16 characters length
311  m_thread_name.Set(name);
312  }
313 
318  void SetCpuAffinity(int affinity)
319  {
320  m_thread_affinity.Set(affinity);
321  }
322 
327  void SetSamplesToRead(size_t value)
328  {
329  m_samples_to_read.Set(value);
330  }
331 
336  void SetSamplesToSkip(size_t value)
337  {
338  m_samples_to_skip.Set(value);
339  }
340 
346  void SetLoopFrequency(size_t value)
347  {
348  m_loop_frequency.Set(value);
349  }
350 
356  void SetErrorMargin(float value)
357  {
358  m_error_margin.Set(value);
359  }
360 
361  // call back registration
362  // TODO: add checks
369  void RegisterOnDataCallback(std::function<void(const TopicType& sample)> callback)
370  {
371  m_callback_on_data = callback;
372  }
373 
381  void RegisterInitThreadCallback(std::function<void()> callback)
382  {
383  m_callback_init_thread = std::move(callback);
384  }
385 
386  private:
387 
388  enum class State : unsigned {
389  Error,
390  Off,
391  Starting,
392  Terminating,
393  Idle,
394  Reading,
395  Skipping,
396  Waiting,
397  Dropping,
398  }; //< known states of the readerThread
399 
400  const std::map<State, std::string> m_state_text = {
401  {State::Error, "Error"},
402  {State::Off, "Off"},
403  {State::Starting, "Starting"},
404  {State::Terminating, "Terminating"},
405  {State::Idle, "Idle"},
406  {State::Reading, "Reading"},
407  {State::Skipping, "Skipping"},
408  {State::Waiting, "Waiting"},
409  {State::Dropping, "Dropping"},
410  }; //< States names
411 
412  enum class Command: unsigned {
413  None,
414  Run,
415  Idle,
416  Exit,
417  }; //< expected commands
418 
419  const std::map<Command, std::string> m_command_text = {
420  {Command::None, "-"},
421  {Command::Run, "Off"},
422  {Command::Idle, "Starting"},
423  {Command::Exit, "Terminating"},
424  }; //< expected commands names
425 
432  std::future<void> SendRequestAsync(Command cmd) {
433  Request<Command> req(cmd);
434  auto f = req.GetReplyFuture();
435  m_request_q.Post(std::move(req));
436  return f;
437  }
438 
445  void SendRequestSync(Command cmd) {
446  using namespace std::chrono_literals;
447 
448  auto f = SendRequestAsync(cmd);
449  auto status = f.wait_for(1200ms);
450  if (status != std::future_status::ready) {
451  CII_THROW(RequestTimedOut, m_command_text.at(cmd));
452  }
453  }
454 
455  Request<Command> GetRequest() {
456  return m_request_q.TryPend().value_or(Request(Command::None));
457  }
458 
463  void Work()
464  {
466  using namespace rtctk::dataTask::detail;
467  using namespace std::chrono;
468  using namespace std::chrono_literals;
469 
470  auto req = GetRequest();
471 
472  m_state = State::Off;
473  State next_state = State::Starting;
474  State prev_state = State::Off;
475 
476  auto time_start = system_clock::now();
477  milliseconds time_elapsed {0};
478  auto duration_skip = CalcDuration(m_samples_to_skip.Get(), m_loop_frequency.Get());
479  auto timeout_wait {duration_skip / 2};
480  auto timeout_drop {duration_skip / 2};
481 
482  size_t to_read = 0;
483  size_t to_skip = 0;
484 
485  std::error_code const ok{};
486  std::error_code ret = ok;
487 
488  auto reader = ReaderType<TopicType>::MakeReader(m_queue_name.Get().c_str(), 30s);
489 
490  while(1) {
491 
492  prev_state = m_state;
493  if(m_state != next_state) {
494  m_state = next_state;
495  LOG4CPLUS_INFO(GetLogger(), "ReaderThread::Work() - changed state to '" << m_state_text.at(m_state) << "'");
496  }
497 
498  switch(m_state)
499  {
500  case State::Starting:
501  {
502  try {
503  if(m_callback_init_thread) {
504  m_callback_init_thread();
505  }
506  next_state = State::Idle;
507  }catch(...) {
508  ret = std::make_error_code(std::errc::timed_out); // TODO: other code
509  next_state = State::Error;
510  }
511  break;
512  }
513 
514  case State::Idle:
515  {
516  if(m_state != prev_state) {
517  req.SetReply();
518  }
519 
520  req = GetRequest();
521 
522  if(req.GetPayload() == Command::Exit) {
523  next_state = State::Terminating;
524  break;
525  }else if(req.GetPayload() == Command::Run) {
526  m_comp_done.Clear();
527  ret = Reset(reader);
528  next_state = ret==ok ? State::Reading : State::Error;
529  break;
530  }
531 
532  std::this_thread::sleep_for(10ms);
533  next_state = m_state;
534  break;
535  }
536 
537  case State::Reading:
538  {
539  if(m_state != prev_state) {
540  if(prev_state == State::Idle) {
541  req.SetReply();
542  }
543  to_read = m_samples_to_read.Get();
544  to_skip = m_samples_to_skip.Get();
545  }
546 
547  req = GetRequest();
548  if(req.GetPayload() == Command::Exit) {
549  next_state = State::Terminating;
550  break;
551  }else if (req.GetPayload() == Command::Idle){
552  next_state = State::Idle;
553  break;
554  }
555 
556  size_t to_read_now = std::min(to_read, m_loop_frequency.Get());
557  auto time_read_start = system_clock::now();
558  ret = Read(reader, \
559  m_callback_on_data, \
560  to_read_now, \
561  m_loop_frequency.Get(), \
562  m_error_margin.Get());
563  auto time_read_end = system_clock::now();
564  CalculateEstimatedFrequency(to_read_now, time_read_end - time_read_start);
565  to_read -= to_read_now;
566 
567  if(ret != ok) {
568  next_state = State::Error;
569  }else if(to_read == 0) {
570  if(m_mode.Get()==ReaderMode::Continuous) {
571  next_state = State::Skipping;
572  }else {
573  next_state = State::Idle;
574  }
575  m_comp_allowed.Post(ok);
576 
577  }else{
578  next_state = m_state;
579  }
580  break;
581  }
582 
583  case State::Skipping:
584  {
585  req = GetRequest();
586  if(req.GetPayload() == Command::Exit) {
587  next_state = State::Terminating;
588  break;
589  }else if (req.GetPayload() == Command::Idle){
590  next_state = State::Idle;
591  break;
592  }
593 
594  size_t to_skip_now = std::min(to_skip, m_loop_frequency.Get());
595  auto time_skip_start = system_clock::now();
596  ret = Skip(reader, to_skip_now, m_loop_frequency.Get(), m_error_margin.Get());
597  auto time_skip_end = system_clock::now();
598  CalculateEstimatedFrequency(to_skip_now, time_skip_end - time_skip_start);
599 
600  to_skip -= to_skip_now;
601 
602  if(ret != ok) {
603  next_state = State::Error;
604  }else if(to_skip == 0) {
605  if(m_comp_done.TryWait()) {
606  next_state = State::Reading;
607  }else {
608  next_state = State::Waiting;
609  }
610  }else{
611  next_state = m_state;
612  }
613  break;
614  }
615 
616  case State::Waiting:
617  {
618  if(m_state != prev_state) {
619  time_start = system_clock::now();
620  }
621 
622  req = GetRequest();
623  if(req.GetPayload() == Command::Exit) {
624  next_state = State::Terminating;
625  break;
626  }else if (req.GetPayload() == Command::Idle){
627  next_state = State::Idle;
628  break;
629  }
630 
631  if(m_comp_done.TryWait()) {
632  next_state = State::Reading;
633  break;
634  }
635 
636  std::this_thread::sleep_for(1ms);
637 
638  time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
639  if((time_elapsed > timeout_wait) or (NumFree(reader) == 0)) { // TODO: revisit
640  next_state = State::Dropping;
641  }else {
642  next_state = m_state;
643  }
644  break;
645  }
646 
647  case State::Dropping:
648  {
649  if(m_state != prev_state) {
650  time_start = system_clock::now();
651  }
652 
653  req = GetRequest();
654  if(req.GetPayload() == Command::Exit) {
655  next_state = State::Terminating;
656  break;
657  }else if (req.GetPayload() == Command::Idle){
658  next_state = State::Idle;
659  break;
660  }
661 
662  if(m_comp_done.TryWait()) {
663  ret = Reset(reader);
664  next_state = ret==ok ? State::Reading : State::Error;
665  break;
666  }
667 
668  std::this_thread::sleep_for(1ms);
669 
670  time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
671  if(time_elapsed > timeout_drop) {
672  ret = std::make_error_code(std::errc::timed_out);
673  next_state = State::Error;
674  }else {
675  next_state = m_state;
676  }
677  break;
678  }
679 
680  case State::Error:
681  {
682  if(m_state != prev_state) {
683  m_comp_allowed.Post(ret);
684  LOG4CPLUS_ERROR(GetLogger(), "ReaderThread::Work() - " << ret.message());
685  }
686 
687  req = GetRequest();
688  if(req.GetPayload() == Command::Exit) {
689  next_state = State::Terminating;
690  }else if(req.GetPayload() == Command::Idle) {
691  next_state = State::Idle;
692  }else{
693  std::this_thread::sleep_for(100ms);
694  next_state = m_state;
695  }
696  break;
697  }
698 
699  case State::Terminating:
700  {
701  next_state = State::Off;
702  break;
703  }
704 
705  case State::Off:
706  {
707  req.SetReply();
708  return;
709  }
710  }
711  }
712  }
713 
720  template <class Rep, class Period>
721  void CalculateEstimatedFrequency(size_t count,
722  std::chrono::duration<Rep, Period> time)
723  {
725  using namespace std::chrono;
726  if(time.count() != 0)
727  {
728  m_estimated_loop_frequency = static_cast<float>(count) / \
729  duration_cast<seconds>(time).count();
730 
731  if(!std::isinf(m_estimated_loop_frequency))
732  {
733  //check if outside of error margin
734  if(m_estimated_loop_frequency < \
735  static_cast<float>(m_loop_frequency.Get())/m_error_margin.Get() || \
736  m_estimated_loop_frequency > \
737  static_cast<float>(m_loop_frequency.Get())*m_error_margin.Get())
738  {
739  LOG4CPLUS_WARN(GetLogger(), "ReaderThread: Measured SHM frequency: " <<\
740  m_estimated_loop_frequency << " expected: " << m_loop_frequency.Get());
741  }
742  }
743  }
744  }
745 
746  std::thread m_thread; //< the readerThead member
747 
748  Parameter<ReaderMode> m_mode; //< parameters mode
749  Parameter<std::string> m_thread_name; //< parameters thread name
750  Parameter<std::string> m_queue_name; //< parameter queue name
751  Parameter<size_t> m_thread_affinity; //< parameter cpu affinity
752  Parameter<size_t> m_samples_to_read; //< parameter samples to be read
753  Parameter<size_t> m_samples_to_skip; //< parameter samples to skip
754  Parameter<size_t> m_loop_frequency; //< parameter loop frequency
755  Parameter<float> m_error_margin; //< parameter timeout tollerance
756 
757  float m_estimated_loop_frequency;
758 
759  MessageQueue<Request<Command>> m_request_q;
760  MessageQueue<std::error_code> m_comp_allowed;
761 
762  Semaphore m_comp_done;
763 
764  State m_state;
765 
766  std::function<void(const TopicType& sample)> m_callback_on_data;
767  std::function<void()> m_callback_init_thread;
768 };
769 
770 } // namespace
771 
772 #endif
rtctk::dataTask::detail::CalcDuration
std::chrono::milliseconds CalcDuration(size_t count, size_t loop_frequency, float error_margin=1.0)
free helper function to calulate the estimated time to read the a number of samples at a given freque...
Definition: readerHelpers.hpp:37
rtctk::dataTask::ReaderThread::SetLoopFrequency
void SetLoopFrequency(size_t value)
required setter sets the loop frequency, if unlocked.
Definition: readerThread.hpp:346
exceptions.hpp
Provides macros and utilities for exception handling.
wscript.name
name
Definition: wscript:15
rtctk::dataTask::ReaderMode
ReaderMode
Definition: readerThread.hpp:59
rtctk::dataTask::ReaderThread::SetCpuAffinity
void SetCpuAffinity(int affinity)
optional setter sets the cpu affinity name, if unlocked.
Definition: readerThread.hpp:318
rtctk::dataTask::ReaderThread::SetErrorMargin
void SetErrorMargin(float value)
optional setter sets the timeout tolerance.
Definition: readerThread.hpp:356
rtctk::dataTask::detail::Reset
std::error_code Reset(ReaderType &reader)
helper function to reset the ipcq.reader to latest sample
Definition: readerHelpers.hpp:176
rtctk::dataTask::ReaderThread::SetQueueName
void SetQueueName(std::string const &name)
required setter sets the queue name to be used if unlocked.
Definition: readerThread.hpp:289
rtctk::dataTask::RequestTimedOut
Definition: readerThread.hpp:43
rtctk::componentFramework::RepositoryIf::Request
A request object to pass information about datapoints that should be read (written) from (to) the rep...
Definition: repositoryIf.hpp:45
rtctk::dataTask::detail::Read
std::error_code Read(ReaderType &reader, Operation &&op, size_t count, size_t loop_frequency, float error_margin=2.0)
helper function to wrap the ipcq.read with handling of timeouts and count values
Definition: readerHelpers.hpp:78
rtctk::componentFramework::RtctkException::RtctkException
RtctkException() noexcept
Definition: exceptions.cpp:101
rtctk::dataTask::ReaderMode::Single
@ Single
messageQueue.hpp
A simple message queue implementation.
rtctk::dataTask::ReaderThread::RegisterInitThreadCallback
void RegisterInitThreadCallback(std::function< void()> callback)
optional callback
Definition: readerThread.hpp:381
rtctk::componentFramework::RtctkException
The RtctkException class is the base class for all Rtctk exceptions.
Definition: exceptions.hpp:207
readerHelpers.hpp
Helper methods to read data from shared memory queue.
rtctk::dataTask::ReaderThread::SetSamplesToRead
void SetSamplesToRead(size_t value)
required setter sets the number of samples to be read, if unlocked.
Definition: readerThread.hpp:327
rtctk::componentFramework::StopToken
rad::StopToken StopToken
Definition: stopToken.hpp:19
rtctk::dataTask::detail
Definition: readerHelpers.hpp:25
semaphore.hpp
A simple semaphore implementation.
rtctk::dataTask::detail::Skip
std::error_code Skip(ReaderType &reader, size_t count, size_t loop_frequency, float error_margin=2.0)
helper function to wrap the ipcq.skip with handling of timeouts and count values
Definition: readerHelpers.hpp:129
rtctk::dataTask::ReaderThread::WaitUntilComputationAllowed
void WaitUntilComputationAllowed(rtctk::componentFramework::StopToken st)
Definition: readerThread.hpp:242
rtctk::dataTask::RequestTimedOut::RequestTimedOut
RequestTimedOut(const std::string &req_name)
Definition: readerThread.hpp:45
rtctk::dataTask::ReaderThread::SetSamplesToSkip
void SetSamplesToSkip(size_t value)
required setter sets the number of samples to skip, if unlocked.
Definition: readerThread.hpp:336
rtctk::componentFramework::GetLogger
log4cplus::Logger & GetLogger(const std::string &name="")
Get handle to a specific logger (used with logging macros)
rtctk::dataTask::ReaderThread::SetThreadName
void SetThreadName(std::string const &name)
optional setter sets the thread name to be used, if unlocked.
Definition: readerThread.hpp:308
rtctk::dataTask::AsynchronousError::AsynchronousError
AsynchronousError(const std::string &text)
Definition: readerThread.hpp:55
rtctk::dataTask::detail::NumFree
size_t NumFree(ReaderType &reader)
helper function to get the free space in the shm.
Definition: readerHelpers.hpp:193
rtctk::dataTask::ReaderThread::Spawn
void Spawn()
Spawn the reader thread.
Definition: readerThread.hpp:112
rtctk::dataTask::ReaderThread::Run
void Run()
send synchronous run request to readerThread.
Definition: readerThread.hpp:197
rtctk::dataTask::ReaderThread::~ReaderThread
~ReaderThread()
Definition: readerThread.hpp:94
rtctk::dataTask::ReaderThread
ReaderThread for the Data Class.
Definition: readerThread.hpp:76
rtctk::dataTask::ReaderThread::RegisterOnDataCallback
void RegisterOnDataCallback(std::function< void(const TopicType &sample)> callback)
required callback
Definition: readerThread.hpp:369
Request
rtctk::componentFramework::RepositoryIf::Request Request
Definition: oldbAdapter.cpp:36
rtctk::dataTask
Definition: messageQueue.hpp:20
parameter.hpp
A Parameter class used for Data Tasks.
stopToken.hpp
A simple Stop Token.
rtctk::telRepub::make_error_code
std::error_code make_error_code(MudpiProcessingError e)
Create std::error_code from ProcessingError.
Definition: mudpiProcessingError.hpp:113
logger.hpp
Logging Support Library based on log4cplus.
rtctk::dataTask::AsynchronousError
Definition: readerThread.hpp:50
rtctk::dataTask::ReaderThread::Idle
void Idle()
send Idle request to readerThread.
Definition: readerThread.hpp:211
rtctk::dataTask::ReaderThread::Join
void Join()
waits for thread to complete and rejoins
Definition: readerThread.hpp:171
rtctk::dataTask::ReaderThread::WaitUntilComputationAllowed
void WaitUntilComputationAllowed()
wait until the computation can be launched.
Definition: readerThread.hpp:227
rtctk::dataTask::ReaderThread::SetMode
void SetMode(ReaderMode mode)
optional setter sets the mode of the reader.
Definition: readerThread.hpp:299
rtctk::dataTask::ReaderThread::SignalComputationDone
void SignalComputationDone()
returns from the buisness logic that te calculation has completed so that the readerThread is safe to...
Definition: readerThread.hpp:280
rtctk::dataTask::ReaderThread::ReaderThread
ReaderThread()
Definition: readerThread.hpp:79
rtctk
Definition: agnostictopicifPubSubTypes.cpp:31
rtctk::dataTask::AsynchronousError::AsynchronousError
AsynchronousError()
Definition: readerThread.hpp:52
request.hpp
A Request class used in Data Tasks.