11 #ifndef RTCTK_COMPONENTFRAMEWORK_DATAPOINTRECORDINGUNIT_HPP
12 #define RTCTK_COMPONENTFRAMEWORK_DATAPOINTRECORDINGUNIT_HPP
14 #include "ecs/taiClock.hpp"
21 #include <numapp/numapolicies.hpp>
22 #include <numapp/thread.hpp>
28 #include <string_view>
33 using namespace std::string_view_literals;
40 template <
typename DpType,
41 typename OutputStage =
42 FitsRecorder<ecs::TaiClock::time_point::rep,
43 std::conditional_t<IsSpanConvertibleV<DpType>, AsSpan<DpType>, DpType>>>
48 static constexpr
uint32_t CAPTURE_ON_START = 1;
50 static constexpr
uint32_t CAPTURE_ON_CHANGE = 4;
63 std::string
const& unit_id,
78 m_capture_mask = CAPTURE_ON_START + CAPTURE_ON_CHANGE + CAPTURE_ON_STOP;
86 if (m_process_thread.joinable()) {
87 m_process_thread.join();
95 void Prepare(
const std::filesystem::path& file_path)
override {
96 SetState(State::PREPARING,
98 "Error in transition to PREPARING, DataPointRecordingUnit not in STOPPED State");
100 m_file_path = file_path / (GetId() +
".fits");
102 if (not m_rtr.DataPointExists(m_dp_path)) {
107 if (m_capture_mask == 0) {
112 auto policies = numapp::NumaPolicies();
113 m_process_thread = numapp::MakeThread(m_unit_id, policies, [&]() {
return Process(); });
125 std::optional<std::filesystem::path>
Stop()
override {
127 if (m_process_thread.joinable()) {
128 m_process_thread.join();
130 auto file = m_file_path;
138 using namespace std::chrono_literals;
140 m_output.Open(*m_file_path);
142 if (m_capture_mask & CAPTURE_ON_CHANGE) {
143 m_rtr.Subscribe(m_dp_path, m_dp_buffer, [
this](
auto& path,
auto& value) {
144 if (GetState() == State::RUNNING) {
146 m_output.Write(AsTuple(value));
148 SetFailed(std::current_exception());
155 SetState(State::IDLE,
157 "Error in transition to IDLE, DataPointRecordingUnit not in PREPARING State");
159 while (m_stop ==
false) {
160 switch (GetState()) {
166 "Error in transition to WAITING, DataPointRecordingUnit not in IDLE State");
169 std::this_thread::sleep_for(1ms);
172 case State::WAITING: {
173 if (not HasLeaders() or (HasLeaders() and HasFirstLeaderStarted())) {
174 if (m_capture_mask & CAPTURE_ON_START) {
175 m_rtr.ReadDataPoint(m_dp_path, m_dp_buffer);
176 m_output.Write(AsTuple(m_dp_buffer));
179 SetState(State::RUNNING,
181 "Error in transition to RUNNING, DataPointRecordingUnit not in "
185 std::this_thread::sleep_for(1ms);
188 case State::RUNNING: {
189 if (HasLeaders() and HasLastLeaderFinished()) {
190 SetState(State::FINISHED,
192 "Error in transition to FINISHED, DataPointRecordingUnit not in "
196 std::this_thread::sleep_for(1ms);
199 std::this_thread::sleep_for(1ms);
204 if (m_capture_mask & CAPTURE_ON_CHANGE) {
205 m_rtr.Unsubscribe(m_dp_path);
208 if (m_capture_mask & CAPTURE_ON_STOP) {
209 m_rtr.ReadDataPoint(m_dp_path, m_dp_buffer);
210 m_output.Write(AsTuple(m_dp_buffer));
220 static std::tuple<ecs::TaiClock::time_point::rep, OutputDpType> AsTuple(DpType
const& data) {
221 auto ts = ecs::TaiClock::now().time_since_epoch().count();
222 if constexpr (IsSpanConvertibleV<DpType>) {
223 return std::make_tuple(ts,
ToSpan(data));
225 return std::make_tuple(ts, data);
229 static constexpr
typename OutputStage::ColumnDescription COLUMNS =
230 typename OutputStage::ColumnDescription{{{
"timestamp"sv,
""sv}, {
"payload"sv,
""sv}}};
232 RuntimeRepoIf& m_rtr;
234 std::optional<std::filesystem::path> m_file_path;
236 DataPointPath m_dp_path;
240 CaptureMask m_capture_mask;
242 OutputStage m_output;
244 std::atomic<bool> m_start =
false;
245 std::atomic<bool> m_stop =
false;
246 std::thread m_process_thread;
251 #endif // RTCTK_COMPONENTFRAMEWORK_DATAPOINTRECORDINGUNIT_HPP