RTC Toolkit  1.0.0
recordingThreadDetail.hpp
Go to the documentation of this file.
1 
12 #ifndef RTCTK_REUSABLECOMPONENT_TELEMETRYRECORDER_RECORDINGTHREAD_DETAIL_HPP
13 #define RTCTK_REUSABLECOMPONENT_TELEMETRYRECORDER_RECORDINGTHREAD_DETAIL_HPP
14 
16 #include <exception>
17 // include the numapp for threading
18 #include <numapp/mempolicy.hpp>
19 #include <numapp/numapolicies.hpp>
20 #include <numapp/thread.hpp>
21 
22 namespace rtctk::telemetryRecorder {
23 
24 template<typename Writer, typename Topic, typename Reader>
26  m_state(STOPPED),
27  m_reader(nullptr),
28  m_writer(std::make_unique<Writer>()) {}
29 
30 template<typename Writer, typename Topic, typename Reader>
31 RecordingThread<Writer,Topic,Reader>::RecordingThread(std::unique_ptr<Writer>&& writer, const std::string& topic) :
32  m_state(STOPPED),
33  m_reader(std::make_unique<Reader>(topic.c_str())),
34  m_writer(std::move(writer)) {}
35 
36 template<typename Writer, typename Topic, typename Reader>
37 RecordingThread<Writer,Topic,Reader>::RecordingThread(std::unique_ptr<Writer>&& writer, std::unique_ptr<Reader>&& reader) :
38  m_state(STOPPED),
39  m_reader(std::move(reader)),
40  m_writer(std::move(writer)) {}
41 
42 template<typename Writer, typename Topic, typename Reader>
44 {
45  {
46  std::lock_guard<std::mutex> state_guard(m_state_lock);
47  m_state = STOPPED;
48  }
49  if(m_thread.joinable()) {
50  m_thread.join();
51  }
52 }
53 
54 template<typename Writer, typename Topic, typename Reader>
56  std::lock_guard<std::mutex> state_guard(m_state_lock);
57  if (m_state != STOPPED)
58  CII_THROW(InvalidRecordingThreadOperation, "Tried to Start() on running ReaderThread");
59  m_state = PAUSED;
60  m_thread = std::thread(&RecordingThread<Writer,Topic,Reader>::Process, this);
61 }
62 
63 template<typename Writer, typename Topic, typename Reader>
65  {
66  std::lock_guard<std::mutex> state_guard(m_state_lock);
67  if (m_state == STOPPED)
68  CII_THROW(InvalidRecordingThreadOperation, "Tried to Stop() already stopped ReaderThread");
69  m_state = STOPPED;
70  }
71  if(m_thread.joinable()) {
72  m_thread.join();
73  }
74 }
75 
76 template<typename Writer, typename Topic, typename Reader>
78  std::lock_guard<std::mutex> state_guard(m_state_lock);
79  if (m_state != RUNNING)
80  CII_THROW(InvalidRecordingThreadOperation, "Tried to Pause() on non-running ReaderThread");
81  m_state = PAUSED;
82 }
83 
84 template<typename Writer, typename Topic, typename Reader>
86  std::lock_guard<std::mutex> state_guard(m_state_lock);
87  if (m_state != PAUSED)
88  CII_THROW(InvalidRecordingThreadOperation, "Tried to Unpause() on not paused ReaderThread");
89  m_state = RUNNING;
90 }
91 
92 template<typename Writer, typename Topic, typename Reader>
94  std::error_code const ok{};
95  while(true) {
96  enum ThreadState state_copy;
97  {
98  std::lock_guard<std::mutex> state_guard(m_state_lock);
99  state_copy = m_state;
100  }
101  while (state_copy == PAUSED) {
102  std::this_thread::sleep_for(std::chrono::milliseconds(100));
103  m_state_lock.lock();
104  state_copy = m_state;
105  m_state_lock.unlock();
106  }
107  if (state_copy == STOPPED) {
108  break;
109  }
110  size_t skip = 0;
111  size_t number = m_reader->NumAvailable();
112  if (number == 0) { // try to always read at least one
113  number = 1;
114  }
115  if (m_subsample_rate > 1) {
116  m_sampling_counter = m_sampling_counter % m_subsample_rate;
117  if (m_sampling_counter == 0) {
118  number = 1;
119  } else {
120  skip = std::min(number, m_subsample_rate-m_sampling_counter);
121  }
122  }
123  std::pair<std::error_code, size_t> result;
124  if(skip == 0) {
125  result = m_reader->Read([&] (const Topic& data) {this->ReadDataWrapper(data);}, number, std::chrono::milliseconds(100));
126  } else {
127  // skip
128  result = m_reader->Read([] (const Topic& data) {}, skip, std::chrono::milliseconds(100));
129  }
130  m_sampling_counter += result.second;
131  // check for error
132  if (not (result.first == ok or result.first == ipcq::Error::Timeout)) {
133  // TODO better error handling?
134  // We stop in case of error
135  m_state_lock.lock();
136  m_state = STOPPED;
137  m_state_lock.unlock();
138  std::string error = "Error reading from ipcq: " + result.first.message();
139  CII_THROW(RecordingThreadIPCQError, error);
140  }
141  }
142 }
143 
144 template<typename Writer, typename Topic, typename Reader>
146  std::lock_guard<std::mutex> state_guard(m_state_lock);
147  if (m_state != PAUSED && m_state != STOPPED) {
148  CII_THROW(InvalidRecordingThreadOperation, "Tried to open new file, while RecorderThread was running");
149  }
150  auto time = std::chrono::system_clock::now();
151  time_t now = std::chrono::system_clock::to_time_t(time);
152  std::stringstream format;
153  format << m_output_dir << "/" << std::put_time(localtime(&now), "%F %T %Z") << ".fits";
154  m_writer->Open(format.str());
155  if (not m_reader) {
156  m_reader = std::make_unique<Reader>(m_topic_name.c_str());
157  }
158 }
159 
160 template<typename Writer, typename Topic, typename Reader>
162  std::lock_guard<std::mutex> state_guard(m_state_lock);
163  if (m_state != PAUSED && m_state != STOPPED) {
164  CII_THROW(InvalidRecordingThreadOperation, "Tried to close file, while RecorderThread was running");
165  }
166  m_writer->Close();
167 }
168 
169 template<typename Writer, typename Topic, typename Reader>
170 void RecordingThread<Writer,Topic,Reader>::SetOutputUri(const ::elt::mal::Uri& storage_uri) {
171  if(storage_uri.scheme() != network::uri::string_view("file")) {
172  CII_THROW(InvalidStorageURLException, "Invalid scheme for TelemetryRecorder");
173  }
174  m_output_dir = storage_uri.path().to_string();
175 }
176 
177 template<typename Writer, typename Topic, typename Reader>
179  m_subsample_rate = rate;
180 }
181 
182 template<typename Writer, typename Topic, typename Reader>
184  std::lock_guard<std::mutex> state_guard(m_state_lock);
185  if (m_state != PAUSED && m_state != STOPPED) {
186  CII_THROW(InvalidRecordingThreadOperation, "Tried to change TelemetrySubset, while RecorderThread was running");
187  }
188  m_writer->SetTelemetrySubSet(subset);
189 }
190 
191 template<typename Writer, typename Topic, typename Reader>
192 void RecordingThread<Writer,Topic,Reader>::SetTopic(const std::string& topic) {
193  m_topic_name = topic;
194 }
195 
196 template<typename Writer, typename Topic, typename Reader>
198  if(m_last_observed_sample_id != 0 and sample_id <= m_last_observed_sample_id) {
199  CII_THROW(RecordingThreadInvalidStartSample, sample_id);
200  }
201  m_start_sample = sample_id;
202 }
203 
204 template<typename Writer, typename Topic, typename Reader>
205 void RecordingThread<Writer,Topic,Reader>::ReadDataWrapper(const Topic& telemetry_data) {
206  m_last_observed_sample_id = telemetry_data.sample_id;
207  if(m_last_observed_sample_id < m_start_sample) {
208  return;
209  }
210  m_writer->WriteData(telemetry_data);
211 }
212 
213 } // namespace rtctk::telemetryRecorder
214 
215 #endif //RTCTK_REUSABLECOMPONENT_TELEMETRYRECORDER_RECORDINGTHREAD_DETAIL_HPP
rtctk::telemetryRecorder::RecordingThread::RecordingThread
RecordingThread()
Definition: recordingThreadDetail.hpp:25
rtctk::telemetryRecorder::RecordingThread::Open
void Open()
Open the reader and writer.
Definition: recordingThreadDetail.hpp:145
rtctk::telemetryRecorder::RecordingThread::SetTopic
void SetTopic(const std::string &topic)
Set the topic for the ipc queue.
Definition: recordingThreadDetail.hpp:192
rtctk::telemetryRecorder::RecordingThread::Pause
void Pause()
Pause recording.
Definition: recordingThreadDetail.hpp:77
rtctk::telemetryRecorder::PAUSED
@ PAUSED
Definition: recordingThread.hpp:29
rtctk::telemetryRecorder::RecordingThread
The RecordingThread is responsible for reading from the ipc queue and writing to the provided Writer.
Definition: recordingThread.hpp:37
rtctk::telemetryRecorder::RUNNING
@ RUNNING
Definition: recordingThread.hpp:28
rtctk::telemetryRecorder::RecordingThread::Stop
void Stop()
Stop the recording thread and wait for it's termination.
Definition: recordingThreadDetail.hpp:64
rtctk::telemetryRecorder::RecordingThread::SetSubSampleFactor
void SetSubSampleFactor(uint64_t rate)
Set the Subsample Factor.
Definition: recordingThreadDetail.hpp:178
rtctk::telemetryRecorder::RecordingThread::SetOutputUri
void SetOutputUri(const ::elt::mal::Uri &storage_uri)
Sets this URI where the output files should be written to.
Definition: recordingThreadDetail.hpp:170
rtctk::telemetryRecorder::RecordingThread::SetTelemetrySubSet
void SetTelemetrySubSet(const std::string &mask)
Set the TelemetrySubset for the Writer.
Definition: recordingThreadDetail.hpp:183
rtctk::telemetryRecorder::RecordingThread::SetStartSampleId
void SetStartSampleId(const uint64_t sample_id)
Set the first sample_id that should be recorded.
Definition: recordingThreadDetail.hpp:197
rtctk::telemetryRecorder::RecordingThread::Close
void Close()
Close the writer file.
Definition: recordingThreadDetail.hpp:161
rtctk::telemetryRecorder::RecordingThread::~RecordingThread
~RecordingThread()
Definition: recordingThreadDetail.hpp:43
rtctk::telemetryRecorder::RecordingThread::Unpause
void Unpause()
Unpause the recording or begin recording after having started the recording thread.
Definition: recordingThreadDetail.hpp:85
rtctk::telemetryRecorder
Definition: businessLogic.hpp:20
recordingThread.hpp
Recording Thread of the Telemetry Recorder.
rtctk::telemetryRecorder::RecordingThread::Start
void Start()
Start the recording thread.
Definition: recordingThreadDetail.hpp:55
std
Definition: mudpiProcessingError.hpp:119
error
void error(const char *msg)
Definition: main.cpp:38
rtctk::telemetryRecorder::STOPPED
@ STOPPED
Definition: recordingThread.hpp:27
rtctk::telemetryRecorder::ThreadState
ThreadState
State of the RecordingThread.
Definition: recordingThread.hpp:26