12 #ifndef RTCTK_REUSABLECOMPONENT_TELEMETRYRECORDER_RECORDINGTHREAD_DETAIL_HPP
13 #define RTCTK_REUSABLECOMPONENT_TELEMETRYRECORDER_RECORDINGTHREAD_DETAIL_HPP
18 #include <numapp/mempolicy.hpp>
19 #include <numapp/numapolicies.hpp>
20 #include <numapp/thread.hpp>
24 template<
typename Writer,
typename Topic,
typename Reader>
28 m_writer(
std::make_unique<Writer>()) {}
30 template<
typename Writer,
typename Topic,
typename Reader>
33 m_reader(
std::make_unique<Reader>(topic.c_str())),
34 m_writer(
std::move(writer)) {}
36 template<
typename Writer,
typename Topic,
typename Reader>
39 m_reader(
std::move(reader)),
40 m_writer(
std::move(writer)) {}
42 template<
typename Writer,
typename Topic,
typename Reader>
46 std::lock_guard<std::mutex> state_guard(m_state_lock);
49 if(m_thread.joinable()) {
54 template<
typename Writer,
typename Topic,
typename Reader>
56 std::lock_guard<std::mutex> state_guard(m_state_lock);
58 CII_THROW(InvalidRecordingThreadOperation,
"Tried to Start() on running ReaderThread");
63 template<
typename Writer,
typename Topic,
typename Reader>
66 std::lock_guard<std::mutex> state_guard(m_state_lock);
68 CII_THROW(InvalidRecordingThreadOperation,
"Tried to Stop() already stopped ReaderThread");
71 if(m_thread.joinable()) {
76 template<
typename Writer,
typename Topic,
typename Reader>
78 std::lock_guard<std::mutex> state_guard(m_state_lock);
80 CII_THROW(InvalidRecordingThreadOperation,
"Tried to Pause() on non-running ReaderThread");
84 template<
typename Writer,
typename Topic,
typename Reader>
86 std::lock_guard<std::mutex> state_guard(m_state_lock);
88 CII_THROW(InvalidRecordingThreadOperation,
"Tried to Unpause() on not paused ReaderThread");
92 template<
typename Writer,
typename Topic,
typename Reader>
94 std::error_code
const ok{};
98 std::lock_guard<std::mutex> state_guard(m_state_lock);
101 while (state_copy ==
PAUSED) {
102 std::this_thread::sleep_for(std::chrono::milliseconds(100));
104 state_copy = m_state;
105 m_state_lock.unlock();
111 size_t number = m_reader->NumAvailable();
115 if (m_subsample_rate > 1) {
116 m_sampling_counter = m_sampling_counter % m_subsample_rate;
117 if (m_sampling_counter == 0) {
120 skip = std::min(number, m_subsample_rate-m_sampling_counter);
123 std::pair<std::error_code, size_t> result;
125 result = m_reader->Read([&] (
const Topic& data) {this->ReadDataWrapper(data);}, number, std::chrono::milliseconds(100));
128 result = m_reader->Read([] (
const Topic& data) {}, skip, std::chrono::milliseconds(100));
130 m_sampling_counter += result.second;
132 if (not (result.first == ok or result.first == ipcq::Error::Timeout)) {
137 m_state_lock.unlock();
138 std::string
error =
"Error reading from ipcq: " + result.first.message();
139 CII_THROW(RecordingThreadIPCQError,
error);
144 template<
typename Writer,
typename Topic,
typename Reader>
146 std::lock_guard<std::mutex> state_guard(m_state_lock);
148 CII_THROW(InvalidRecordingThreadOperation,
"Tried to open new file, while RecorderThread was running");
150 auto time = std::chrono::system_clock::now();
151 time_t now = std::chrono::system_clock::to_time_t(time);
152 std::stringstream format;
153 format << m_output_dir <<
"/" << std::put_time(localtime(&now),
"%F %T %Z") <<
".fits";
154 m_writer->Open(format.str());
156 m_reader = std::make_unique<Reader>(m_topic_name.c_str());
160 template<
typename Writer,
typename Topic,
typename Reader>
162 std::lock_guard<std::mutex> state_guard(m_state_lock);
164 CII_THROW(InvalidRecordingThreadOperation,
"Tried to close file, while RecorderThread was running");
169 template<
typename Writer,
typename Topic,
typename Reader>
171 if(storage_uri.scheme() != network::uri::string_view(
"file")) {
172 CII_THROW(InvalidStorageURLException,
"Invalid scheme for TelemetryRecorder");
174 m_output_dir = storage_uri.path().to_string();
177 template<
typename Writer,
typename Topic,
typename Reader>
179 m_subsample_rate = rate;
182 template<
typename Writer,
typename Topic,
typename Reader>
184 std::lock_guard<std::mutex> state_guard(m_state_lock);
186 CII_THROW(InvalidRecordingThreadOperation,
"Tried to change TelemetrySubset, while RecorderThread was running");
188 m_writer->SetTelemetrySubSet(subset);
191 template<
typename Writer,
typename Topic,
typename Reader>
193 m_topic_name = topic;
196 template<
typename Writer,
typename Topic,
typename Reader>
198 if(m_last_observed_sample_id != 0 and sample_id <= m_last_observed_sample_id) {
199 CII_THROW(RecordingThreadInvalidStartSample, sample_id);
201 m_start_sample = sample_id;
204 template<
typename Writer,
typename Topic,
typename Reader>
206 m_last_observed_sample_id = telemetry_data.sample_id;
207 if(m_last_observed_sample_id < m_start_sample) {
210 m_writer->WriteData(telemetry_data);
215 #endif //RTCTK_REUSABLECOMPONENT_TELEMETRYRECORDER_RECORDINGTHREAD_DETAIL_HPP