12 #ifndef RTCTK_DATATASK_READERHELPERS_HPP
13 #define RTCTK_DATATASK_READERHELPERS_HPP
17 #include <system_error>
37 inline std::chrono::milliseconds
CalcTimeout(
size_t count,
float loop_frequency,
float error_margin)
39 using namespace std::chrono;
40 return milliseconds{
static_cast<int>(ceil(1000.0 * count / loop_frequency) * error_margin)};
56 template <
typename ReaderType,
typename Operation>
57 std::error_code
Read(ReaderType& reader, Operation&& op,
size_t count,
float loop_frequency, \
61 using namespace std::chrono;
63 auto timeout_short =
CalcTimeout(1, loop_frequency, error_margin);
64 auto timeout_total =
CalcTimeout(count, loop_frequency, error_margin);
67 std::error_code
const ok;
68 std::pair<std::error_code, size_t> ret;
69 milliseconds time_elapsed {0};
70 auto time_start = system_clock::now();
73 ret = reader.Read(std::forward<Operation>(op), count, timeout_short);
75 LOG4CPLUS_ERROR(
GetLogger(),
"!Reading from shm timed out: check if queue is being filled");
76 LOG4CPLUS_ERROR(
GetLogger(),
"Read: " << ret.second <<
" in " << timeout_short.count() <<
" ms");
77 LOG4CPLUS_ERROR(
GetLogger(),
"Expected: " << count);
85 time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
86 if (time_elapsed > timeout_total) {
87 LOG4CPLUS_ERROR(
GetLogger(),
"Reading from shm timed out: check if queue is being filled");
88 LOG4CPLUS_ERROR(
GetLogger(),
"Read: " <<
read <<
" in " << time_elapsed.count() <<
" ms");
108 template <
typename ReaderType>
109 std::error_code
Skip(ReaderType& reader,
size_t count,
float loop_frequency, \
113 using namespace std::chrono;
115 auto timeout_short =
CalcTimeout(1, loop_frequency, error_margin);
116 auto timeout_total =
CalcTimeout(count, loop_frequency, error_margin);
119 std::error_code
const ok;
120 std::pair<std::error_code, size_t> ret;
121 milliseconds time_elapsed {0};
122 auto time_start = system_clock::now();
125 ret = reader.Skip(count, timeout_short);
126 if(ret.first != ok) {
127 LOG4CPLUS_ERROR(
GetLogger(),
"!Skipping from shm timed out: check if queue is being filled");
128 LOG4CPLUS_ERROR(
GetLogger(),
"Read: " << ret.second <<
" in " << timeout_short.count() <<
" ms");
129 LOG4CPLUS_ERROR(
GetLogger(),
"Expected: " << count);
132 skipped += ret.second;
133 if (skipped == count) {
137 time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
138 if (time_elapsed > timeout_total) {
139 LOG4CPLUS_ERROR(
GetLogger(),
"Skipping from shm timed out: check if queue is being filled");
140 LOG4CPLUS_ERROR(
GetLogger(),
"Read: " << skipped <<
" in " << time_elapsed.count() <<
" ms");
141 LOG4CPLUS_ERROR(
GetLogger(),
"Expected: " << skipped);
156 template <
typename ReaderType>
157 std::error_code
Reset(ReaderType& reader)
159 if (reader.Size() != 0) {
160 return reader.Reset();
173 template <
typename ReaderType>
176 return reader.Size() - reader.NumAvailable();