12 #ifndef RTCTK_DATATASK_READERHELPERS_HPP
13 #define RTCTK_DATATASK_READERHELPERS_HPP
17 #include <system_error>
37 std::chrono::milliseconds
CalcDuration(
size_t count,
size_t loop_frequency,
float error_margin = 1.0)
39 using namespace std::chrono;
40 return milliseconds{
static_cast<int>(ceil(1000.0 * count / loop_frequency))};
53 std::pair<std::chrono::milliseconds, std::chrono::milliseconds>
CalcTimeout(
size_t count, \
54 size_t loop_frequency,\
55 float error_margin = 2.0)
57 using namespace std::chrono;
59 int timeout_total = ceil((
static_cast<float>(count)/loop_frequency)*1000)*error_margin;
60 int timeout_short = ceil((
static_cast<float>(1)/loop_frequency)*1000)*error_margin;
61 return {milliseconds{timeout_short}, milliseconds{timeout_total}};
77 template <
typename ReaderType,
typename Operation>
78 std::error_code
Read(ReaderType& reader, Operation&& op,
size_t count,
size_t loop_frequency, \
79 float error_margin = 2.0)
82 using namespace std::chrono;
84 auto [timeout_short, timeout_total] =
CalcTimeout(count, loop_frequency, error_margin);
87 std::error_code
const ok;
88 std::pair<std::error_code, size_t> ret;
89 milliseconds time_elapsed {0};
90 auto time_start = system_clock::now();
93 ret = reader.Read(std::forward<Operation>(op), count, timeout_short);
95 LOG4CPLUS_ERROR(
GetLogger(),
"Reading from shm timed out: check queue is being filled");
96 LOG4CPLUS_ERROR(
GetLogger(),
"Read: " << ret.second <<
" in " << timeout_short.count() <<
" ms");
97 LOG4CPLUS_ERROR(
GetLogger(),
"Expected: " << count);
105 time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
106 if (time_elapsed > timeout_total) {
107 LOG4CPLUS_ERROR(
GetLogger(),
"Reading from shm timed out: check queue is being filled");
108 LOG4CPLUS_ERROR(
GetLogger(),
"Read: " << read <<
" in " << time_elapsed.count() <<
" ms");
109 LOG4CPLUS_ERROR(
GetLogger(),
"Expected: " << read);
128 template <
typename ReaderType>
129 std::error_code
Skip(ReaderType& reader,
size_t count,
size_t loop_frequency, \
130 float error_margin = 2.0)
133 using namespace std::chrono;
135 auto [timeout_short, timeout_total] =
CalcTimeout(count, loop_frequency, error_margin);
138 std::error_code
const ok;
139 std::pair<std::error_code, size_t> ret;
140 milliseconds time_elapsed {0};
141 auto time_start = system_clock::now();
144 ret = reader.Skip(count, timeout_short);
145 if(ret.first != ok) {
146 LOG4CPLUS_ERROR(
GetLogger(),
"Reading from shm timed out: check queue is being filled");
147 LOG4CPLUS_ERROR(
GetLogger(),
"Read: " << ret.second <<
" in " << timeout_short.count() <<
" ms");
148 LOG4CPLUS_ERROR(
GetLogger(),
"Expected: " << count);
151 skipped += ret.second;
152 if (skipped == count) {
156 time_elapsed = duration_cast<milliseconds>(system_clock::now() - time_start);
157 if (time_elapsed > timeout_total) {
158 LOG4CPLUS_ERROR(
GetLogger(),
"Reading from shm timed out: check queue is being filled");
159 LOG4CPLUS_ERROR(
GetLogger(),
"Read: " << skipped <<
" in " << time_elapsed.count() <<
" ms");
160 LOG4CPLUS_ERROR(
GetLogger(),
"Expected: " << skipped);
175 template <
typename ReaderType>
176 std::error_code
Reset(ReaderType& reader)
178 if (reader.Size() != 0) {
179 return reader.Reset();
192 template <
typename ReaderType>
195 return reader.Size() - reader.NumAvailable();