11 #ifndef RTCTK_COMPONENTFRAMEWORK_EVENTRECORDINGUNIT_HPP
12 #define RTCTK_COMPONENTFRAMEWORK_EVENTRECORDINGUNIT_HPP
22 #include "ecs/taiClock.hpp"
24 #include <numapp/numapolicies.hpp>
25 #include <numapp/thread.hpp>
32 #include <string_view>
36 using namespace std::string_view_literals;
46 using Output = std::tuple<ecs::TaiClock::time_point::rep, std::string, std::string, std::string, std::string>;
50 { {
"timestamp"sv,
""sv},
51 {
"source_timestamp"sv,
""sv},
58 auto ts = ecs::TaiClock::now().time_since_epoch().count();
59 return std::make_tuple(ts, std::to_string(data.
time), data.
origin, data.
item, data.
result.dump());
71 using Output = std::tuple<ecs::TaiClock::time_point::rep, std::string, std::string, std::string>;
75 { {
"timestamp"sv,
""sv},
76 {
"source_timestamp"sv,
""sv},
82 auto ts = ecs::TaiClock::now().time_since_epoch().count();
83 return std::make_tuple(ts, std::to_string(data.
time), data.
origin, data.
item);
95 using Output = std::tuple<ecs::TaiClock::time_point::rep, std::string, std::string, std::string, std::string>;
99 { {
"timestamp"sv,
""sv},
100 {
"source_timestamp"sv,
""sv},
107 auto ts = ecs::TaiClock::now().time_since_epoch().count();
120 using Output = std::tuple<ecs::TaiClock::time_point::rep, std::string, std::string, std::string, std::string>;
124 { {
"timestamp"sv,
""sv},
125 {
"source_timestamp"sv,
""sv},
132 auto ts = ecs::TaiClock::now().time_since_epoch().count();
134 bool is_first =
true;
135 for(
const auto& item : data.
items) {
136 if(is_first==
false) {
142 return std::make_tuple(ts, std::to_string(data.
time), data.
origin, items, data.
state);
153 template <typename EventType, typename OutputStage = typename RecordingInfo<EventType>::Recorder>
168 std::string
const& unit_id,
174 , m_event_service(es)
175 , m_subscriber(m_event_service.MakeSubscriber<EventType>())
176 , m_input_filter(input_filter)
184 if(m_process_thread.joinable()) {
185 m_process_thread.join();
194 void Prepare(
const std::filesystem::path& file_path)
override {
196 SetState(State::PREPARING, State::STOPPED,
"tried to prepare a non-stopped EventRecordingUnit");
198 m_file_path = file_path / (GetId() +
".fits");
200 auto policies = numapp::NumaPolicies();
201 m_process_thread = numapp::MakeThread(m_unit_id, policies, [&]() {
return Process(); });
213 std::optional<std::filesystem::path>
Stop()
override {
215 if(m_process_thread.joinable()) {
216 m_process_thread.join();
219 auto file = m_file_path;
229 using namespace std::chrono_literals;
231 m_output.Open(*m_file_path);
233 m_subscriber->Subscribe([&](EventType
const& ev) {
234 if(GetState() == State::RUNNING) {
237 if(m_input_filter(ev)) {
241 m_output.Write(RecordingInfo<EventType>::AsTuple(ev));
244 SetFailed(std::current_exception());
250 SetState(State::IDLE, State::PREPARING,
"EventRecordingUnit tried to go to IDLE from another state than PREPARING");
252 while(m_stop==
false) {
257 SetState(State::WAITING, State::IDLE,
"EventRecorder illegal state transition, tried going to WAITING from another state than IDLE");
260 std::this_thread::sleep_for(1ms);
263 case State::WAITING: {
264 if(not HasLeaders() or (HasLeaders() and HasFirstLeaderStarted())) {
265 SetState(State::RUNNING, State::WAITING,
"EventRecorder illegal state transition, tried going to RUNNING from another state than WAITING");
268 std::this_thread::sleep_for(1ms);
271 case State::RUNNING: {
272 if(HasLeaders() and HasLastLeaderFinished()) {
273 SetState(State::FINISHED, State::RUNNING,
"EventRecorder illegal state transition, tried going to FINISHED from another state than RUNNING");
276 std::this_thread::sleep_for(1ms);
279 std::this_thread::sleep_for(1ms);
284 m_subscriber->Unsubscribe();
292 TypedEventService m_event_service;
293 std::unique_ptr<TypedEventSubscriber<EventType>> m_subscriber;
295 FilterMethod m_input_filter;
297 OutputStage m_output;
299 std::atomic<bool> m_start =
false;
300 std::atomic<bool> m_stop =
false;
301 std::thread m_process_thread;
310 template <
typename OutputStage = FitsRecorder<ecs::TaiClock::time_po
int::rep, std::
string>>
326 std::string
const& unit_id,
330 std::string
const& topic_name,
333 , m_event_service(es)
334 , m_subscriber(m_event_service.MakeSubscriber(topic_name))
335 , m_input_filter(input_filter)
343 if(m_process_thread.joinable()) {
344 m_process_thread.join();
353 void Prepare(
const std::filesystem::path& file_path)
override {
355 SetState(State::PREPARING, State::STOPPED,
"EventRecorder tried to go from non-STOPPED State to PREPARING");
357 m_file_path = file_path / (GetId() +
".fits");
359 auto policies = numapp::NumaPolicies();
360 m_process_thread = numapp::MakeThread(m_unit_id, policies, [&]() {
return Process(); });
372 std::optional<std::filesystem::path>
Stop()
override {
374 if(m_process_thread.joinable()) {
375 m_process_thread.join();
385 using namespace std::chrono_literals;
387 m_output.Open(*m_file_path);
389 m_subscriber->Subscribe([&](
JsonPayload const& ev) {
391 if(GetState() == State::RUNNING) {
394 if(m_input_filter(ev)) {
395 m_output.Write(AsTuple(ev));
398 m_output.Write(AsTuple(ev));
401 SetFailed(std::current_exception());
407 SetState(State::IDLE, State::PREPARING,
"Event recorder was not in PREPARING when trying to go to IDLE");
409 while(m_stop==
false) {
414 SetState(State::WAITING, State::IDLE,
"Event recorder was not in IDLE when trying to go to WAITING");
417 std::this_thread::sleep_for(1ms);
420 case State::WAITING: {
421 if(not HasLeaders() or (HasLeaders() and HasFirstLeaderStarted())) {
422 SetState(State::RUNNING, State::WAITING,
"Event recorder was not in WAITING when trying to go to RUNNING");
425 std::this_thread::sleep_for(1ms);
428 case State::RUNNING: {
429 if(HasLeaders() and HasLastLeaderFinished()) {
430 SetState(State::FINISHED, State::RUNNING,
"Event recorder was not running when trying to go to finish");
433 std::this_thread::sleep_for(1ms);
436 std::this_thread::sleep_for(1ms);
441 m_subscriber->Unsubscribe();
449 static std::tuple<ecs::TaiClock::time_point::rep, std::string> AsTuple(
JsonPayload const& data) {
450 auto ts = ecs::TaiClock::now().time_since_epoch().count();
451 return std::make_tuple(ts, data.dump());
454 static constexpr
typename OutputStage::ColumnDescription COLUMNS =
typename OutputStage::ColumnDescription {
455 { {
"timestamp"sv,
""sv},
456 {
"payload"sv,
""sv} }
459 EventServiceIf& m_event_service;
460 std::unique_ptr<EventSubscriberIf> m_subscriber;
462 FilterMethod m_input_filter;
464 OutputStage m_output;
466 std::atomic<bool> m_start =
false;
467 std::atomic<bool> m_stop =
false;
468 std::thread m_process_thread;
473 #endif // RTCTK_COMPONENTFRAMEWORK_EVENTRECORDINGUNIT_HPP