RTC Toolkit  2.0.0
dataPointRecordingUnit.hpp
Go to the documentation of this file.
1 
11 #ifndef RTCTK_COMPONENTFRAMEWORK_DATAPOINTRECORDINGUNIT_HPP
12 #define RTCTK_COMPONENTFRAMEWORK_DATAPOINTRECORDINGUNIT_HPP
13 
14 #include "ecs/taiClock.hpp"
20 
21 #include <numapp/numapolicies.hpp>
22 #include <numapp/thread.hpp>
23 
24 #include <cstdint>
25 #include <exception>
26 #include <mutex>
27 #include <string>
28 #include <string_view>
29 #include <thread>
30 
31 namespace rtctk::componentFramework {
32 
33 using namespace std::string_view_literals;
34 
40 template <typename DpType,
41  typename OutputStage =
42  FitsRecorder<ecs::TaiClock::time_point::rep,
43  std::conditional_t<IsSpanConvertibleV<DpType>, AsSpan<DpType>, DpType>>>
45 public:
46  // we are using a bit-mask to configure when to capture data
48  static constexpr uint32_t CAPTURE_ON_START = 1;
49  static constexpr uint32_t CAPTURE_ON_STOP = 2;
50  static constexpr uint32_t CAPTURE_ON_CHANGE = 4;
51 
52  using OutputDpType = std::conditional_t<IsSpanConvertibleV<DpType>, AsSpan<DpType>, DpType>;
53 
62  DataPointRecordingUnit(std::string const& comp_id,
63  std::string const& unit_id,
64  RuntimeRepoIf& rtr,
65  OldbIf& oldb,
66  DataPointPath const& dp_path)
67  : RecordingUnit(comp_id, unit_id, rtr, oldb)
68  , m_rtr(rtr)
69  , m_dp_path(dp_path)
70  , m_dp_buffer()
71  , m_output(COLUMNS) {
72  DataPointPath path = DataPointPath(comp_id) / DataPointPath("static") /
73  DataPointPath("rec_units") / DataPointPath(unit_id) /
74  DataPointPath("capture_mask");
75  if (rtr.DataPointExists(path)) {
76  m_capture_mask = rtr.GetDataPoint<int32_t>(path);
77  } else {
78  m_capture_mask = CAPTURE_ON_START + CAPTURE_ON_CHANGE + CAPTURE_ON_STOP;
79  }
80 
81  Update();
82  }
83 
85  m_stop = true;
86  if (m_process_thread.joinable()) {
87  m_process_thread.join();
88  }
89  SetStopped();
90  }
95  void Prepare(const std::filesystem::path& file_path) override {
96  SetState(State::PREPARING,
97  State::STOPPED,
98  "Error in transition to PREPARING, DataPointRecordingUnit not in STOPPED State");
99 
100  m_file_path = file_path / (GetId() + ".fits");
101 
102  if (not m_rtr.DataPointExists(m_dp_path)) {
103  SetFailed(nullptr);
104  CII_THROW(InvalidSetting, "DataPoint does not exist");
105  }
106 
107  if (m_capture_mask == 0) {
108  SetFailed(nullptr);
109  CII_THROW(InvalidSetting, "Invalid capture mask 0");
110  }
111 
112  auto policies = numapp::NumaPolicies();
113  m_process_thread = numapp::MakeThread(m_unit_id, policies, [&]() { return Process(); });
114  }
118  void Start() override {
119  m_start = true;
120  }
125  std::optional<std::filesystem::path> Stop() override {
126  m_stop = true;
127  if (m_process_thread.joinable()) {
128  m_process_thread.join();
129  }
130  auto file = m_file_path;
131  m_file_path.reset();
132  SetStopped();
133  return file;
134  }
135 
136 private:
137  void Process() {
138  using namespace std::chrono_literals;
139 
140  m_output.Open(*m_file_path);
141 
142  if (m_capture_mask & CAPTURE_ON_CHANGE) {
143  m_rtr.Subscribe(m_dp_path, m_dp_buffer, [this](auto& path, auto& value) {
144  if (GetState() == State::RUNNING) {
145  try {
146  m_output.Write(AsTuple(value));
147  } catch (...) {
148  SetFailed(std::current_exception());
149  return;
150  }
151  }
152  });
153  }
154 
155  SetState(State::IDLE,
156  State::PREPARING,
157  "Error in transition to IDLE, DataPointRecordingUnit not in PREPARING State");
158 
159  while (m_stop == false) {
160  switch (GetState()) {
161  case State::IDLE: {
162  if (m_start) {
163  SetState(
164  State::WAITING,
165  State::IDLE,
166  "Error in transition to WAITING, DataPointRecordingUnit not in IDLE State");
167  break;
168  }
169  std::this_thread::sleep_for(1ms);
170  break;
171  }
172  case State::WAITING: {
173  if (not HasLeaders() or (HasLeaders() and HasFirstLeaderStarted())) {
174  if (m_capture_mask & CAPTURE_ON_START) {
175  m_rtr.ReadDataPoint(m_dp_path, m_dp_buffer);
176  m_output.Write(AsTuple(m_dp_buffer));
177  }
178 
179  SetState(State::RUNNING,
180  State::WAITING,
181  "Error in transition to RUNNING, DataPointRecordingUnit not in "
182  "WAITING State");
183  break;
184  }
185  std::this_thread::sleep_for(1ms);
186  break;
187  }
188  case State::RUNNING: {
189  if (HasLeaders() and HasLastLeaderFinished()) {
190  SetState(State::FINISHED,
191  State::RUNNING,
192  "Error in transition to FINISHED, DataPointRecordingUnit not in "
193  "RUNNING State");
194  break;
195  }
196  std::this_thread::sleep_for(1ms);
197  }
198  default: {
199  std::this_thread::sleep_for(1ms);
200  }
201  }
202  }
203 
204  if (m_capture_mask & CAPTURE_ON_CHANGE) {
205  m_rtr.Unsubscribe(m_dp_path);
206  }
207 
208  if (m_capture_mask & CAPTURE_ON_STOP) {
209  m_rtr.ReadDataPoint(m_dp_path, m_dp_buffer);
210  m_output.Write(AsTuple(m_dp_buffer));
211  }
212 
213  m_output.Close();
214  // State is set to stopped after join in Stop function
215  m_start = false;
216  m_stop = false;
217  ResetLeaderStates();
218  }
219 
220  static std::tuple<ecs::TaiClock::time_point::rep, OutputDpType> AsTuple(DpType const& data) {
221  auto ts = ecs::TaiClock::now().time_since_epoch().count();
222  if constexpr (IsSpanConvertibleV<DpType>) {
223  return std::make_tuple(ts, ToSpan(data));
224  } else {
225  return std::make_tuple(ts, data);
226  }
227  }
228 
229  static constexpr typename OutputStage::ColumnDescription COLUMNS =
230  typename OutputStage::ColumnDescription{{{"timestamp"sv, ""sv}, {"payload"sv, ""sv}}};
231 
232  RuntimeRepoIf& m_rtr;
233 
234  std::optional<std::filesystem::path> m_file_path;
235 
236  DataPointPath m_dp_path;
237 
238  DpType m_dp_buffer;
239 
240  CaptureMask m_capture_mask;
241 
242  OutputStage m_output;
243 
244  std::atomic<bool> m_start = false;
245  std::atomic<bool> m_stop = false;
246  std::thread m_process_thread;
247 };
248 
249 } // namespace rtctk::componentFramework
250 
251 #endif // RTCTK_COMPONENTFRAMEWORK_DATAPOINTRECORDINGUNIT_HPP
rtctk::componentFramework::RepositoryIf::GetDataPoint
T GetDataPoint(const DataPointPath &path) const
Fetches a datapoint from the repository.
Definition: repositoryIf.hpp:552
exceptions.hpp
Provides macros and utilities for exception handling.
recordingInfo.hpp
FitsRecorder allows to write ColumnData to into fits files in a specified directory.
rtctk::componentFramework::DataPointRecordingUnit::DataPointRecordingUnit
DataPointRecordingUnit(std::string const &comp_id, std::string const &unit_id, RuntimeRepoIf &rtr, OldbIf &oldb, DataPointPath const &dp_path)
Create a new recording unit.
Definition: dataPointRecordingUnit.hpp:62
rtctk::componentFramework::DataPointRecordingUnit::Stop
std::optional< std::filesystem::path > Stop() override
Stop the recording and wait for it's termination.
Definition: dataPointRecordingUnit.hpp:125
rtctk::componentFramework::DataPointRecordingUnit::CaptureMask
uint32_t CaptureMask
Definition: dataPointRecordingUnit.hpp:47
rtctk::componentFramework
Definition: commandReplier.cpp:20
dataRecorder.hpp
FitsRecorder allows to write ColumnData to into fits files in a specified directory.
rtctk::componentFramework::RepositoryIf::DataPointExists
virtual bool DataPointExists(const DataPointPath &path) const =0
Checks for the existence of a datapoint in the repository.
rtctk::componentFramework::DataPointRecordingUnit::Prepare
void Prepare(const std::filesystem::path &file_path) override
Prepare the recording.
Definition: dataPointRecordingUnit.hpp:95
rtctk::componentFramework::RecordingUnit
Abstract base class for all sources that can be recorded by the MetadataCollector and TelemetryRecord...
Definition: recordingUnit.hpp:65
fitsDataRecorder.hpp
FitsRecorder allows to write ColumnData to into fits files in a specified directory.
rtctk::componentFramework::ToSpan
AsSpanT< T > ToSpan(T &data)
Simple function that converts types that are convertible to spans to a span.
Definition: recordingUtils.hpp:24
rtctk::componentFramework::DataPointRecordingUnit
Recording Unit that can record RTR datapoints.
Definition: dataPointRecordingUnit.hpp:44
recordingUnit.hpp
Abstract base class defining functionality common to all recording units.
rtctk::componentFramework::InvalidSetting
This Exception is raised when a invalid setting was used in the runtime repo.
Definition: recordingUnit.hpp:52
rtctk::componentFramework::OldbIf
Definition: oldbIf.hpp:20
rtctk::componentFramework::AsSpan
Gets the span type for converting type T to a span.
Definition: recordingTypeTraits.hpp:107
rtctk::componentFramework::DataPointRecordingUnit::OutputDpType
std::conditional_t< IsSpanConvertibleV< DpType >, AsSpan< DpType >, DpType > OutputDpType
Definition: dataPointRecordingUnit.hpp:52
mudpi::uint32_t
unsigned int uint32_t
Definition: mudpi.h:16
rtctk::componentFramework::RuntimeRepoIf
Definition: runtimeRepoIf.hpp:20
mudpi::int32_t
int int32_t
Definition: mudpi.h:17
rtctk::componentFramework::DataPointRecordingUnit::Start
void Start() override
Start the recording.
Definition: dataPointRecordingUnit.hpp:118
rtctk::componentFramework::DataPointPath
This class provides a wrapper for a data point path.
Definition: dataPointPath.hpp:65
rtctk::componentFramework::DataPointRecordingUnit::~DataPointRecordingUnit
virtual ~DataPointRecordingUnit()
Definition: dataPointRecordingUnit.hpp:84