RTC Toolkit  2.0.0
ipcqRecordingUnit.hpp
Go to the documentation of this file.
1 
11 #ifndef RTCTK_COMPONENTFRAMEWORK_IPCQRECORDINGUNIT_HPP
12 #define RTCTK_COMPONENTFRAMEWORK_IPCQRECORDINGUNIT_HPP
13 
18 #include <cstdint>
19 #include <string>
20 #include <exception>
21 #include <mutex>
22 #include <thread>
23 
24 namespace rtctk::componentFramework {
25 
29 class IpcqError : public RtctkException {
30  public:
32  explicit IpcqError(const std::string& message) :
33  RtctkException("Error in IPCQ: " + message) {}
34 };
35 
41 template <typename Topic, typename OutputStage = typename RecordingInfo<Topic>::Recorder>
43 public:
52  IpcqRecordingUnit(std::string const& comp_id,
53  std::string const& unit_id,
54  RepositoryIf& rtr,
55  RepositoryIf& oldb)
56  : RecordingUnit(comp_id, unit_id, rtr, oldb),
57  m_queue_name{rtr.GetDataPoint<std::string>(DataPointPath(comp_id) /
58  DataPointPath("static") /
59  DataPointPath("rec_units") /
60  DataPointPath(unit_id) /
61  SHM_QUEUE_NAME_PATH)},
63  DataPointPath("static") /
64  DataPointPath("rec_units") /
65  DataPointPath(unit_id) /
66  TELEMETRY_SUBSET_PATH))
67  {
68 
69  DataPointPath path = DataPointPath(comp_id) /
70  DataPointPath("static") /
71  DataPointPath("rec_units") /
72  DataPointPath(unit_id) /
73  CPU_AFFINITY_PATH;
74  if(rtr.DataPointExists(path)) {
75  m_cpu_affinity = rtr.GetDataPoint<int32_t>(path);
76  }
77 
78  Update();
79  }
80 
81  virtual ~IpcqRecordingUnit() {
82  m_stop = true;
83  if(m_process_thread.joinable()) {
84  m_process_thread.join();
85  }
86  }
87 
92  void Prepare(const std::filesystem::path& file_path) override;
96  void Start() override;
101  std::optional<std::filesystem::path> Stop() override;
105  void Update() override;
106 
107 protected:
115  static typename OutputStage::DisabledFields GetDisabled(RepositoryIf& rtr, const DataPointPath& sub_path);
116 
117 private:
121  void Process();
125  std::thread m_process_thread;
129  std::atomic<bool> m_start = false;
130  std::atomic<bool> m_stop = false;
134  std::string m_queue_name;
138  std::optional<size_t> m_cpu_affinity;
142  OutputStage m_output;
146  int64_t m_subsample_factor;
150  int64_t m_start_sample_id;
154  int64_t m_stop_after_num_samples;
158  int64_t m_last_observed_sample_id;
162  uint64_t m_sampling_counter;
166  inline static const DataPointPath SHM_QUEUE_NAME_PATH = DataPointPath("shm_queue_name");
170  inline static const DataPointPath CPU_AFFINITY_PATH = DataPointPath("cpu_affinity");
174  inline static const DataPointPath TELEMETRY_SUBSET_PATH = DataPointPath("telemetry_subset");
178  inline static const DataPointPath SUBSAMPLE_FACTOR_PATH = DataPointPath("subsample_factor");
182  inline static const DataPointPath START_SAMPLE_ID_PATH = DataPointPath("start_at_sample_id");
186  inline static const DataPointPath STOP_AFTER_NUM_SAMPLES = DataPointPath("stop_after_num_samples");
187 };
188 
189 } // namespace rtctk::componentFramework
190 
191 #include "rtctk/componentFramework/ipcqRecordingUnit.ipp"
192 
193 #endif // RTCTK_COMPONENTFRAMEWORK_IPCQRECORDINGUNIT_HPP
rtctk::componentFramework::IpcqRecordingUnit::Prepare
void Prepare(const std::filesystem::path &file_path) override
Prepare the recording thread and start recording.
rtctk::componentFramework::IpcqRecordingUnit::GetDisabled
static OutputStage::DisabledFields GetDisabled(RepositoryIf &rtr, const DataPointPath &sub_path)
get disabled fields from a DataPoint in the runtime repo.
rtctk::componentFramework::IpcqRecordingUnit::Stop
std::optional< std::filesystem::path > Stop() override
Stop the recording thread and wait for it's termination.
rtctk::componentFramework::RepositoryIf::GetDataPoint
T GetDataPoint(const DataPointPath &path) const
Fetches a datapoint from the repository.
Definition: repositoryIf.hpp:552
exceptions.hpp
Provides macros and utilities for exception handling.
recordingInfo.hpp
FitsRecorder allows to write ColumnData to into fits files in a specified directory.
rtctk::componentFramework::RtctkException::RtctkException
RtctkException() noexcept
Definition: exceptions.cpp:99
rtctk::componentFramework
Definition: commandReplier.cpp:20
dataRecorder.hpp
FitsRecorder allows to write ColumnData to into fits files in a specified directory.
rtctk::componentFramework::IpcqRecordingUnit::Update
void Update() override
Update settings from RuntimeRepo.
rtctk::componentFramework::IpcqError
This Exception is raised when the ipc queue returns an error that cannot be handled by the RecordingT...
Definition: ipcqRecordingUnit.hpp:29
rtctk::componentFramework::IpcqRecordingUnit::IpcqRecordingUnit
IpcqRecordingUnit(std::string const &comp_id, std::string const &unit_id, RepositoryIf &rtr, RepositoryIf &oldb)
Create a new Ipcq Recorder reading from a given queue and outputting to the given output stage.
Definition: ipcqRecordingUnit.hpp:52
rtctk::componentFramework::RtctkException
The RtctkException class is the base class for all Rtctk exceptions.
Definition: exceptions.hpp:207
rtctk::componentFramework::RecordingInfo
Definition: recordingInfo.hpp:21
rtctk::componentFramework::IpcqRecordingUnit
Recording Unit that can record from shared memory queue.
Definition: ipcqRecordingUnit.hpp:42
rtctk::componentFramework::IpcqRecordingUnit::Start
void Start() override
Start the recording.
rtctk::componentFramework::RecordingUnit
Abstract base class for all sources that can be recorded by the MetadataCollector and TelemetryRecord...
Definition: recordingUnit.hpp:65
rtctk_config_tool.message
message
Definition: rtctk_config_tool.py:42
rtctk::componentFramework::RepositoryIf
Abstract interface providing basic read and write facilities to a repository.
Definition: repositoryIf.hpp:34
rtctk::componentFramework::IpcqError::IpcqError
IpcqError(const std::string &message)
Definition: ipcqRecordingUnit.hpp:32
recordingUnit.hpp
Abstract base class defining functionality common to all recording units.
mudpi::int32_t
int int32_t
Definition: mudpi.h:17
rtctk::componentFramework::DataPointPath
This class provides a wrapper for a data point path.
Definition: dataPointPath.hpp:65
rtctk::componentFramework::IpcqRecordingUnit::~IpcqRecordingUnit
virtual ~IpcqRecordingUnit()
Definition: ipcqRecordingUnit.hpp:81