RTC Toolkit  2.0.0
eventRecordingUnit.hpp
Go to the documentation of this file.
1 
11 #ifndef RTCTK_COMPONENTFRAMEWORK_EVENTRECORDINGUNIT_HPP
12 #define RTCTK_COMPONENTFRAMEWORK_EVENTRECORDINGUNIT_HPP
13 
22 #include "ecs/taiClock.hpp"
23 
24 #include <numapp/numapolicies.hpp>
25 #include <numapp/thread.hpp>
26 
27 #include <cstdint>
28 #include <string>
29 #include <exception>
30 #include <mutex>
31 #include <thread>
32 #include <string_view>
33 
34 namespace rtctk::componentFramework {
35 
36 using namespace std::string_view_literals;
37 
43 template <>
46  using Output = std::tuple<ecs::TaiClock::time_point::rep, std::string, std::string, std::string, std::string>;
48 
50  { {"timestamp"sv, ""sv},
51  {"source_timestamp"sv, ""sv},
52  {"origin"sv, ""sv},
53  {"item"sv, ""sv},
54  {"result"sv, ""sv} }
55  };
56 
57  static Output AsTuple(const DataType& data) {
58  auto ts = ecs::TaiClock::now().time_since_epoch().count();
59  return std::make_tuple(ts, std::to_string(data.time), data.origin, data.item, data.result.dump());
60  }
61 };
62 
68 template <>
71  using Output = std::tuple<ecs::TaiClock::time_point::rep, std::string, std::string, std::string>;
73 
75  { {"timestamp"sv, ""sv},
76  {"source_timestamp"sv, ""sv},
77  {"origin"sv, ""sv},
78  {"item"sv, ""sv} }
79  };
80 
81  static Output AsTuple(const DataType& data) {
82  auto ts = ecs::TaiClock::now().time_since_epoch().count();
83  return std::make_tuple(ts, std::to_string(data.time), data.origin, data.item);
84  }
85 };
86 
92 template <>
95  using Output = std::tuple<ecs::TaiClock::time_point::rep, std::string, std::string, std::string, std::string>;
97 
99  { {"timestamp"sv, ""sv},
100  {"source_timestamp"sv, ""sv},
101  {"origin"sv, ""sv},
102  {"item"sv, ""sv},
103  {"state"sv, ""sv} }
104  };
105 
106  static Output AsTuple(const DataType& data) {
107  auto ts = ecs::TaiClock::now().time_since_epoch().count();
108  return std::make_tuple(ts, std::to_string(data.time), data.origin, data.item, data.state);
109  }
110 };
111 
117 template <>
120  using Output = std::tuple<ecs::TaiClock::time_point::rep, std::string, std::string, std::string, std::string>;
122 
124  { {"timestamp"sv, ""sv},
125  {"source_timestamp"sv, ""sv},
126  {"origin"sv, ""sv},
127  {"items"sv, ""sv},
128  {"state"sv, ""sv} }
129  };
130 
131  static Output AsTuple(const DataType& data) {
132  auto ts = ecs::TaiClock::now().time_since_epoch().count();
133  std::string items;
134  bool is_first = true;
135  for(const auto& item : data.items) {
136  if(is_first==false) {
137  items += ";";
138  }
139  items += item;
140  is_first = false;
141  }
142  return std::make_tuple(ts, std::to_string(data.time), data.origin, items, data.state);
143  }
144 };
145 
146 // TODO: add trait classes for other event types
147 
153 template <typename EventType, typename OutputStage = typename RecordingInfo<EventType>::Recorder>
155 public:
156  using FilterMethod = std::function<bool(EventType const&)>;
157 
167  TypedEventRecordingUnit(std::string const& comp_id,
168  std::string const& unit_id,
169  RepositoryIf& rtr,
170  RepositoryIf& oldb,
171  EventServiceIf& es,
172  FilterMethod input_filter = nullptr)
173  : RecordingUnit(comp_id, unit_id, rtr, oldb)
174  , m_event_service(es)
175  , m_subscriber(m_event_service.MakeSubscriber<EventType>())
176  , m_input_filter(input_filter)
177  , m_output(RecordingInfo<EventType>::COLUMNS)
178  {
179  Update();
180  }
181 
183  m_stop = true;
184  if(m_process_thread.joinable()) {
185  m_process_thread.join();
186  }
187  SetStopped();
188  }
189 
194  void Prepare(const std::filesystem::path& file_path) override {
195 
196  SetState(State::PREPARING, State::STOPPED, "tried to prepare a non-stopped EventRecordingUnit");
197 
198  m_file_path = file_path / (GetId() + ".fits");
199 
200  auto policies = numapp::NumaPolicies();
201  m_process_thread = numapp::MakeThread(m_unit_id, policies, [&]() { return Process(); });
202  }
206  void Start() override {
207  m_start = true;
208  }
213  std::optional<std::filesystem::path> Stop() override {
214  m_stop = true;
215  if(m_process_thread.joinable()) {
216  m_process_thread.join();
217  }
218 
219  auto file = m_file_path;
220  m_file_path.reset();
221  SetStopped();
222  return file;
223  }
224 
225 private:
226 
227  void Process() {
228 
229  using namespace std::chrono_literals;
230 
231  m_output.Open(*m_file_path);
232 
233  m_subscriber->Subscribe([&](EventType const& ev) {
234  if(GetState() == State::RUNNING) {
235  try{
236  if(m_input_filter) {
237  if(m_input_filter(ev)) {
238  m_output.Write(RecordingInfo<EventType>::AsTuple(ev));
239  }
240  }else{
241  m_output.Write(RecordingInfo<EventType>::AsTuple(ev));
242  }
243  }catch (...) {
244  SetFailed(std::current_exception());
245  return;
246  }
247  }
248  });
249 
250  SetState(State::IDLE, State::PREPARING, "EventRecordingUnit tried to go to IDLE from another state than PREPARING");
251 
252  while(m_stop==false) {
253 
254  switch(GetState()) {
255  case State::IDLE: {
256  if(m_start) {
257  SetState(State::WAITING, State::IDLE, "EventRecorder illegal state transition, tried going to WAITING from another state than IDLE");
258  break;
259  }
260  std::this_thread::sleep_for(1ms);
261  break;
262  }
263  case State::WAITING: {
264  if(not HasLeaders() or (HasLeaders() and HasFirstLeaderStarted())) {
265  SetState(State::RUNNING, State::WAITING, "EventRecorder illegal state transition, tried going to RUNNING from another state than WAITING");
266  break;
267  }
268  std::this_thread::sleep_for(1ms);
269  break;
270  }
271  case State::RUNNING: {
272  if(HasLeaders() and HasLastLeaderFinished()) {
273  SetState(State::FINISHED, State::RUNNING, "EventRecorder illegal state transition, tried going to FINISHED from another state than RUNNING");
274  break;
275  }
276  std::this_thread::sleep_for(1ms);
277  }
278  default: {
279  std::this_thread::sleep_for(1ms);
280  }
281  }
282  }
283 
284  m_subscriber->Unsubscribe();
285  m_output.Close();
286  // Stopped state will be set by Stop function after join
287  m_start = false;
288  m_stop = false;
289  ResetLeaderStates();
290  }
291 
292  TypedEventService m_event_service;
293  std::unique_ptr<TypedEventSubscriber<EventType>> m_subscriber;
294 
295  FilterMethod m_input_filter;
296 
297  OutputStage m_output;
298 
299  std::atomic<bool> m_start = false;
300  std::atomic<bool> m_stop = false;
301  std::thread m_process_thread;
302 };
303 
304 
310 template <typename OutputStage = FitsRecorder<ecs::TaiClock::time_point::rep, std::string>>
312 public:
313  using FilterMethod = std::function<bool(JsonPayload const&)>;
314 
325  JsonEventRecordingUnit(std::string const& comp_id,
326  std::string const& unit_id,
327  RepositoryIf& rtr,
328  RepositoryIf& oldb,
329  EventServiceIf& es,
330  std::string const& topic_name,
331  FilterMethod input_filter = nullptr)
332  : RecordingUnit(comp_id, unit_id, rtr, oldb)
333  , m_event_service(es)
334  , m_subscriber(m_event_service.MakeSubscriber(topic_name))
335  , m_input_filter(input_filter)
336  , m_output(COLUMNS)
337  {
338  Update();
339  }
340 
342  m_stop = true;
343  if(m_process_thread.joinable()) {
344  m_process_thread.join();
345  }
346  SetStopped();
347  }
348 
353  void Prepare(const std::filesystem::path& file_path) override {
354 
355  SetState(State::PREPARING, State::STOPPED, "EventRecorder tried to go from non-STOPPED State to PREPARING");
356 
357  m_file_path = file_path / (GetId() + ".fits");
358 
359  auto policies = numapp::NumaPolicies();
360  m_process_thread = numapp::MakeThread(m_unit_id, policies, [&]() { return Process(); });
361  }
365  void Start() override {
366  m_start = true;
367  }
372  std::optional<std::filesystem::path> Stop() override {
373  m_stop = true;
374  if(m_process_thread.joinable()) {
375  m_process_thread.join();
376  }
377  SetStopped();
378  return m_file_path;
379  }
380 
381 private:
382 
383  void Process() {
384 
385  using namespace std::chrono_literals;
386 
387  m_output.Open(*m_file_path);
388 
389  m_subscriber->Subscribe([&](JsonPayload const& ev) {
390 
391  if(GetState() == State::RUNNING) {
392  try{
393  if(m_input_filter) {
394  if(m_input_filter(ev)) {
395  m_output.Write(AsTuple(ev));
396  }
397  }else{
398  m_output.Write(AsTuple(ev));
399  }
400  }catch (...) {
401  SetFailed(std::current_exception());
402  return;
403  }
404  }
405  });
406 
407  SetState(State::IDLE, State::PREPARING, "Event recorder was not in PREPARING when trying to go to IDLE");
408 
409  while(m_stop==false) {
410 
411  switch(GetState()) {
412  case State::IDLE: {
413  if(m_start) {
414  SetState(State::WAITING, State::IDLE, "Event recorder was not in IDLE when trying to go to WAITING");
415  break;
416  }
417  std::this_thread::sleep_for(1ms);
418  break;
419  }
420  case State::WAITING: {
421  if(not HasLeaders() or (HasLeaders() and HasFirstLeaderStarted())) {
422  SetState(State::RUNNING, State::WAITING, "Event recorder was not in WAITING when trying to go to RUNNING");
423  break;
424  }
425  std::this_thread::sleep_for(1ms);
426  break;
427  }
428  case State::RUNNING: {
429  if(HasLeaders() and HasLastLeaderFinished()) {
430  SetState(State::FINISHED, State::RUNNING, "Event recorder was not running when trying to go to finish");
431  break;
432  }
433  std::this_thread::sleep_for(1ms);
434  }
435  default: {
436  std::this_thread::sleep_for(1ms);
437  }
438  }
439  }
440 
441  m_subscriber->Unsubscribe();
442  m_output.Close();
443  // the actual STOPPED state will be set by the Stop function
444  m_start = false;
445  m_stop = false;
446  ResetLeaderStates();
447  }
448 
449  static std::tuple<ecs::TaiClock::time_point::rep, std::string> AsTuple(JsonPayload const& data) {
450  auto ts = ecs::TaiClock::now().time_since_epoch().count();
451  return std::make_tuple(ts, data.dump());
452  }
453 
454  static constexpr typename OutputStage::ColumnDescription COLUMNS = typename OutputStage::ColumnDescription {
455  { {"timestamp"sv, ""sv},
456  {"payload"sv, ""sv} }
457  };
458 
459  EventServiceIf& m_event_service;
460  std::unique_ptr<EventSubscriberIf> m_subscriber;
461 
462  FilterMethod m_input_filter;
463 
464  OutputStage m_output;
465 
466  std::atomic<bool> m_start = false;
467  std::atomic<bool> m_stop = false;
468  std::thread m_process_thread;
469 };
470 
471 } // namespace rtctk::componentFramework
472 
473 #endif // RTCTK_COMPONENTFRAMEWORK_EVENTRECORDINGUNIT_HPP
rtctk::componentFramework::RecordingInfo< ComputationEvent >::AsTuple
static Output AsTuple(const DataType &data)
Definition: eventRecordingUnit.hpp:106
rtctk::componentFramework::RecordingInfo< ComputationFinishedEvent >::Output
std::tuple< ecs::TaiClock::time_point::rep, std::string, std::string, std::string, std::string > Output
Definition: eventRecordingUnit.hpp:46
rtctk::componentFramework::ComputationEvent
Abstract Event Type that is used as a base for computation events.
Definition: eventDefinitions.hpp:65
rtctk::componentFramework::TypedEventRecordingUnit::Stop
std::optional< std::filesystem::path > Stop() override
Stop the recording and wait for it's termination.
Definition: eventRecordingUnit.hpp:213
exceptions.hpp
Provides macros and utilities for exception handling.
recordingInfo.hpp
FitsRecorder allows to write ColumnData to into fits files in a specified directory.
rtctk::componentFramework::AbstractEvent::origin
std::string origin
Definition: eventDefinitions.hpp:56
rtctk::componentFramework
Definition: commandReplier.cpp:20
dataRecorder.hpp
FitsRecorder allows to write ColumnData to into fits files in a specified directory.
rtctk::componentFramework::JsonEventRecordingUnit::JsonEventRecordingUnit
JsonEventRecordingUnit(std::string const &comp_id, std::string const &unit_id, RepositoryIf &rtr, RepositoryIf &oldb, EventServiceIf &es, std::string const &topic_name, FilterMethod input_filter=nullptr)
Create a new recording unit.
Definition: eventRecordingUnit.hpp:325
typedEventService.hpp
High-level API of the event service.
rtctk::componentFramework::ConfigurationEvent
Abstract Event Type that is used as a base for configuration events.
Definition: eventDefinitions.hpp:143
rtctk::componentFramework::JsonEventRecordingUnit
Recording Unit that can record JSON events.
Definition: eventRecordingUnit.hpp:311
rtctk::componentFramework::ComputationStartedEvent
Event Type used to signal that a computation has started.
Definition: eventDefinitions.hpp:95
rtctk::componentFramework::RecordingInfo< ComputationEvent >::Output
std::tuple< ecs::TaiClock::time_point::rep, std::string, std::string, std::string, std::string > Output
Definition: eventRecordingUnit.hpp:95
rtctk::componentFramework::RecordingInfo
Definition: recordingInfo.hpp:21
rtctk::componentFramework::FitsRecorder
Definition: fitsDataRecorder.hpp:38
rtctk::componentFramework::JsonEventRecordingUnit::FilterMethod
std::function< bool(JsonPayload const &)> FilterMethod
Definition: eventRecordingUnit.hpp:313
rtctk::componentFramework::RecordingUnit
Abstract base class for all sources that can be recorded by the MetadataCollector and TelemetryRecord...
Definition: recordingUnit.hpp:65
rtctk::componentFramework::RecordingInfo< ComputationStartedEvent >::Output
std::tuple< ecs::TaiClock::time_point::rep, std::string, std::string, std::string > Output
Definition: eventRecordingUnit.hpp:71
rtctk::componentFramework::TypedEventRecordingUnit
Recording Unit that can record typed events.
Definition: eventRecordingUnit.hpp:154
fitsDataRecorder.hpp
FitsRecorder allows to write ColumnData to into fits files in a specified directory.
eventDefinitions.hpp
Framework-provided event definitions.
rtctk::componentFramework::RepositoryIf
Abstract interface providing basic read and write facilities to a repository.
Definition: repositoryIf.hpp:34
rtctk::componentFramework::JsonEventRecordingUnit::Start
void Start() override
Start the recording.
Definition: eventRecordingUnit.hpp:365
recordingUnit.hpp
Abstract base class defining functionality common to all recording units.
rtctk::componentFramework::TypedEventRecordingUnit::Start
void Start() override
Start the recording.
Definition: eventRecordingUnit.hpp:206
rtctk::componentFramework::ConfigurationEvent::state
std::string state
Definition: eventDefinitions.hpp:169
rtctk::componentFramework::ConfigurationEvent::items
std::vector< std::string > items
Definition: eventDefinitions.hpp:168
rtctk::componentFramework::EventServiceIf
Interface class for providing pub/sub facilities for JSON events.
Definition: eventServiceIf.hpp:26
rtctk::componentFramework::RecordingInfo< ComputationFinishedEvent >::AsTuple
static Output AsTuple(const DataType &data)
Definition: eventRecordingUnit.hpp:57
rtctk::componentFramework::JsonEventRecordingUnit::Stop
std::optional< std::filesystem::path > Stop() override
Stop the recording and wait for it's termination.
Definition: eventRecordingUnit.hpp:372
rtctk::componentFramework::AbstractEvent::time
uint64_t time
Definition: eventDefinitions.hpp:57
rtctk::componentFramework::RecordingInfo< ComputationStartedEvent >::AsTuple
static Output AsTuple(const DataType &data)
Definition: eventRecordingUnit.hpp:81
rtctk::componentFramework::ComputationFinishedEvent
Event Type used to signal that a computation has finished.
Definition: eventDefinitions.hpp:113
rtctk::componentFramework::ComputationFinishedEvent::result
JsonPayload result
Definition: eventDefinitions.hpp:135
rtctk::componentFramework::RecordingInfo< ConfigurationEvent >::Output
std::tuple< ecs::TaiClock::time_point::rep, std::string, std::string, std::string, std::string > Output
Definition: eventRecordingUnit.hpp:120
rtctk::componentFramework::ComputationEvent::item
std::string item
Definition: eventDefinitions.hpp:88
rtctk::componentFramework::FitsRecorder::ColumnDescription
std::array< FitsColumn, sizeof...(T)> ColumnDescription
Definition: fitsDataRecorder.hpp:49
eventServiceIf.hpp
Low-level interface of the event service.
rtctk::componentFramework::TypedEventRecordingUnit::TypedEventRecordingUnit
TypedEventRecordingUnit(std::string const &comp_id, std::string const &unit_id, RepositoryIf &rtr, RepositoryIf &oldb, EventServiceIf &es, FilterMethod input_filter=nullptr)
Create a new recording unit.
Definition: eventRecordingUnit.hpp:167
rtctk::componentFramework::JsonEventRecordingUnit::Prepare
void Prepare(const std::filesystem::path &file_path) override
Prepare the recording.
Definition: eventRecordingUnit.hpp:353
rtctk::componentFramework::JsonEventRecordingUnit::~JsonEventRecordingUnit
virtual ~JsonEventRecordingUnit()
Definition: eventRecordingUnit.hpp:341
rtctk::componentFramework::RecordingInfo< ConfigurationEvent >::AsTuple
static Output AsTuple(const DataType &data)
Definition: eventRecordingUnit.hpp:131
rtctk::componentFramework::TypedEventRecordingUnit::~TypedEventRecordingUnit
virtual ~TypedEventRecordingUnit()
Definition: eventRecordingUnit.hpp:182
rtctk::componentFramework::TypedEventRecordingUnit::FilterMethod
std::function< bool(EventType const &)> FilterMethod
Definition: eventRecordingUnit.hpp:156
rtctk::componentFramework::JsonPayload
nlohmann::json JsonPayload
Type requirements:
Definition: jsonPayload.hpp:24
rtctk::componentFramework::ComputationEvent::state
std::string state
Definition: eventDefinitions.hpp:89
rtctk::componentFramework::TypedEventRecordingUnit::Prepare
void Prepare(const std::filesystem::path &file_path) override
Prepare the recording.
Definition: eventRecordingUnit.hpp:194