ifw-daq  1.0.0
IFW Data Acquisition modules
manager.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq
4  * @copyright 2021 ESO - European Southern Observatory
5  *
6  * @brief Declaration of `daq::Manager`
7  */
8 #ifndef OCM_DAQ_MANAGER_HPP_
9 #define OCM_DAQ_MANAGER_HPP_
10 #include "config.hpp"
11 
12 #include <string_view>
13 #include <vector>
14 
15 #include <boost/thread/future.hpp>
16 #include <log4cplus/logger.h>
17 #include <rad/ioExecutor.hpp>
18 
19 #include <daq/state.hpp>
20 #include <daq/status.hpp>
21 #include <daq/error.hpp>
22 #include <daq/utility.hpp>
23 
24 
25 namespace daq {
26 
27 /**
28  * Creates a DAQ id candidate that may or may not be unique.
29  *
30  * @param instrument_id The instrument ID to use for the id. Only the 5 first characters will be
31  * used if the name is longer than 5.
32  * @param jitter Jitter is a millisecond component added to the current time. This is meant to be
33  * used to find a unique id by adding jitter until a unique id is found.
34  *
35  * @note The function cannot guarantee unique ID and is the responsibility of the caller.
36  * @ingroup daq_ocm_libdaq
37  */
38 std::string MakeDaqIdCandidate(char const* instrument_id, unsigned jitter=0);
39 
40 class DaqController;
41 class FitsController;
42 
43 /**
44  * Observes any status.
45  */
46 class StatusSignal {
47 public:
48  using SignalType = boost::signals2::signal<void(ObservableStatus const&)>;
49 
50  template <class Observer>
51  boost::signals2::connection ConnectObserver(Observer o) {
52  return m_signal.connect(std::move(o));
53  }
54 
55  void Signal(ObservableStatus const& status) {
56  m_signal(status);
57  }
58 private:
59  SignalType m_signal;
60 };
61 
62 /**
63  * Manager owns DaqController and FitsController (active data acquisitions) instances and
64  * multiplexes requests to them.
65  *
66  *
67  * Important responsibilities:
68  *
69  * - Provide interface that allows multiplexing of multiple DaqControllers and FitsControllers
70  * (identified by unique id). This interface is close to what the MAL ICD interface looks like to
71  * control DaqControllers and FitsControllers.
72  * - Monitor primary data sources to stop DaqController if a primary source completes (TODO).
73  *
74  * FitsController also act as a data source.
75  *
76  * Out of scope:
77  *
78  * - Creation of `DaqController` instances.
79  * - Creation of `FitsController` instances.
80  *
81  * @note: It's a template only to facilitate mocking of DaqController, without requiring it to be
82  * an interface.
83  *
84  * @ingroup daq_ocm_libdaq
85  */
86 class Manager {
87 public:
88  virtual ~Manager() {}
89  using Signal = boost::signals2::signal<void(ObservableStatus const&)>;
90 
91  /**
92  * Creates a new unique identifier based on the instrumend id and current time.
93  *
94  * If there is a id collision when using current time a millisecond jitter component
95  * is added until a unique id is found.
96  *
97  * The format is the same as ARCFILE - the file extension: "<OLAS_ID>-2020-08-19T09:33:11.951"
98  *
99  * @note OLAS_ID is the first 5 characters of instrument id.
100  * @note The returned ID is guaranteed to be unique for all IDs known by Manager (there is
101  * currently no global repository of previous IDs).
102  *
103  * @throws std::system_error on failure.
104  */
105  virtual std::string MakeDaqId() const = 0;
106 
107  /**
108  * Query existing data acquisition by @c id.
109  *
110  * @returns true if there is a already a DAQ with the same ID.
111  * @returns false otherwise.
112  */
113  virtual bool HaveDaq(std::string_view id) const DAQ_NOEXCEPT = 0;
114 
115  /**
116  * Get status
117  *
118  * @throw std::invalid_argument if no data acquisition exist with provided `id`.
119  */
120  virtual Status GetStatus(std::string_view id) const = 0;
121 
122  /**
123  * Add data acquisition.
124  *
125  * When a DataAcqusition is added the Manager will also start subscribing to events
126  * from primary data sources for monitoring purposes.
127  */
128  virtual void AddDaq(std::shared_ptr<DaqController> daq) = 0;
129 
130  /**
131  * Start DaqController identified by `id`.
132  *
133  * @param id Data acquisition id.
134  *
135  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
136  * does not exist.
137  * @returns Future that will eventually be ready when data acquisition has started, or failed to
138  * start.
139  */
140  virtual boost::future<State> StartDaqAsync(std::string_view id) = 0;
141 
142  /**
143  * Stop DaqController identified by `id`.
144  *
145  * @param id Data acquisition id.
146  * @param policy Error policy determining if errors are tolerated or not.
147  *
148  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
149  * does not exist.
150  * @returns Future that will eventually be ready when data acquisition has stopped, or failed to
151  * stop.
152  */
153  virtual boost::future<Status> StopDaqAsync(std::string_view id, ErrorPolicy policy) = 0;
154 
155  /**
156  * Abort DaqController identified by `id`.
157  *
158  * @param id Data acquisition id.
159  * @param policy Error policy determining if errors are tolerated or not.
160  *
161  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
162  * does not exist.
163  * @returns Future that will eventually be ready when data acquisition has aborted, or failed to
164  * abort.
165  */
166  virtual boost::future<Status> AbortDaqAsync(std::string_view id, ErrorPolicy policy) = 0;
167 
168  /**
169  * Await DAQ state
170  *
171  * @param id Data acquisition id.
172  * @param state target state to await.
173  * @param timeout How long to wait for state to be reached.
174  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
175  * does not exist or timeout is negative.
176  * @returns Future with a value set when condition is fulfilled or times out.
177  */
178  virtual boost::future<Result<Status>> AwaitDaqStateAsync(std::string_view id,
179  State state,
180  std::chrono::milliseconds timeout) = 0;
181  /**
182  * Update FITS keywords for DaqController identified by `id`.
183  * @param id Data acquisition id.
184  * @param keywords FITS keywords to update.
185  *
186  * @throw std::invalid_argument if no data acquisition exist with provided `id`.
187  * @throw std::runtime_error If DaqController state does not allow updating of keywords (it's
188  * e.g. already been submitted for merging).
189  */
190  virtual void UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) = 0;
191 
192  /**
193  * @returns status observer object.
194  *
195  * The status observer observes any DAQ status changes.
196  */
198 
199  /**
200  * @returns current Daq Controllers.
201  */
202  virtual std::vector<std::shared_ptr<DaqController const>> GetDaqControllers() = 0;
203 
204 };
205 
206 /**
207  * Implements `daq::Manager`.
208  *
209  * @ingroup daq_ocm_libdaq
210  */
211 class ManagerImpl : public Manager {
212 public:
213  /**
214  * @param instrument_id Instrument id.
215  */
216  explicit ManagerImpl(rad::IoExecutor& executor, std::string instrument_id);
217  ~ManagerImpl() noexcept;
218 
219  std::string MakeDaqId() const override;
220 
221  bool HaveDaq(std::string_view id) const noexcept override;
222 
223  Status GetStatus(std::string_view id) const override;
224 
225  void AddDaq(std::shared_ptr<DaqController> daq) override;
226 
227  boost::future<State> StartDaqAsync(std::string_view id) override;
228 
229  boost::future<Status> StopDaqAsync(std::string_view id, ErrorPolicy policy) override;
230 
231  boost::future<Status> AbortDaqAsync(std::string_view id, ErrorPolicy policy) override;
232 
233  boost::future<Result<Status>>
234  AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override;
235 
236  void UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) override;
237 
238  StatusSignal& GetStatusSignal() override;
239 
240  std::vector<std::shared_ptr<DaqController const>> GetDaqControllers() override;
241 
242 private:
243  struct Daq {
244  Daq(std::string id_arg,
245  std::shared_ptr<DaqController> controller_arg,
246  boost::signals2::connection connection_arg) noexcept;
247 
248  std::string id;
249  std::shared_ptr<DaqController> controller;
250  // connection for observer connected to controller.
251  boost::signals2::connection connection;
252  };
253 
254  /// Adds unique identifier (unrelated to DAQ id) to op abort functions
255  class OpAbortFunc {
256  public:
257  using Func = std::function<bool()>;
258  OpAbortFunc(Func&& f);
259  OpAbortFunc(OpAbortFunc&&) = default;
260  OpAbortFunc& operator=(OpAbortFunc&&) = default;
261 
262  uint64_t GetId() const noexcept;
263  bool Abort() noexcept;
264  private:
265  static uint64_t NextId();
266  uint64_t m_id;
267  std::function<bool()> m_func;
268  };
269  void RemoveAbortFunc(uint64_t id) noexcept;
270  DaqController const* FindDaq(std::string_view id) const noexcept;
271  DaqController* FindDaq(std::string_view id) noexcept;
272  DaqController& FindDaqOrThrow(std::string_view id);
273  DaqController const& FindDaqOrThrow(std::string_view id) const;
274 
275  std::shared_ptr<bool> m_alive_token;
276  rad::IoExecutor& m_executor;
277  std::string m_instrument_id;
278  StatusSignal m_status_signal;
279  /// Data acquisitions
280  std::vector<Daq> m_daq_controllers;
281  /// Pair of DAQ-id and abort function
282  std::vector<OpAbortFunc> m_abort_funcs;
283  log4cplus::Logger m_logger;
284 };
285 
286 } // namespace daq
287 #endif // #ifndef OCM_DAQ_MANAGER_HPP_
daq::StatusSignal::SignalType
boost::signals2::signal< void(ObservableStatus const &)> SignalType
Definition: manager.hpp:48
DAQ_NOEXCEPT
#define DAQ_NOEXCEPT
Definition: config.hpp:16
daq::State
State
Observable states of the data acquisition process.
Definition: state.hpp:41
daq::Manager::UpdateKeywords
virtual void UpdateKeywords(std::string_view id, fits::KeywordVector const &keywords)=0
Update FITS keywords for DaqController identified by id.
daq::Manager::GetDaqControllers
virtual std::vector< std::shared_ptr< DaqController const > > GetDaqControllers()=0
ioExecutor.hpp
daq::ManagerImpl::AbortDaqAsync
boost::future< Status > AbortDaqAsync(std::string_view id, ErrorPolicy policy) override
Abort DaqController identified by id.
Definition: manager.cpp:171
daq::ManagerImpl
Implements daq::Manager.
Definition: manager.hpp:211
daq::StatusSignal::Signal
void Signal(ObservableStatus const &status)
Definition: manager.hpp:55
rad::IoExecutor
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
daq::ManagerImpl::MakeDaqId
std::string MakeDaqId() const override
Creates a new unique identifier based on the instrumend id and current time.
Definition: manager.cpp:87
daq::Manager::Signal
boost::signals2::signal< void(ObservableStatus const &)> Signal
Definition: manager.hpp:89
daq::ObservableStatus
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:68
daq
Definition: daqController.cpp:18
daq::StatusSignal::ConnectObserver
boost::signals2::connection ConnectObserver(Observer o)
Definition: manager.hpp:51
daq::StatusSignal
Observes any status.
Definition: manager.hpp:46
daq::ManagerImpl::AwaitDaqStateAsync
boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override
Await DAQ state.
Definition: manager.cpp:179
daq::ManagerImpl::ManagerImpl
ManagerImpl(rad::IoExecutor &executor, std::string instrument_id)
Definition: manager.cpp:63
daq::Manager::MakeDaqId
virtual std::string MakeDaqId() const =0
Creates a new unique identifier based on the instrumend id and current time.
daq::Manager::AwaitDaqStateAsync
virtual boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State state, std::chrono::milliseconds timeout)=0
Await DAQ state.
config.hpp
Config class header file.
daq::ManagerImpl::StopDaqAsync
boost::future< Status > StopDaqAsync(std::string_view id, ErrorPolicy policy) override
Stop DaqController identified by id.
Definition: manager.cpp:163
daq::ManagerImpl::~ManagerImpl
~ManagerImpl() noexcept
Definition: manager.cpp:71
daq::ManagerImpl::GetStatusSignal
StatusSignal & GetStatusSignal() override
Definition: manager.cpp:223
daq::ManagerImpl::StartDaqAsync
boost::future< State > StartDaqAsync(std::string_view id) override
Start DaqController identified by id.
Definition: manager.cpp:155
status.hpp
Contains declaration for Status and ObservableStatus.
daq::Manager::HaveDaq
virtual bool HaveDaq(std::string_view id) const DAQ_NOEXCEPT=0
Query existing data acquisition by id.
daq::ManagerImpl::AddDaq
void AddDaq(std::shared_ptr< DaqController > daq) override
Add data acquisition.
Definition: manager.cpp:100
daq::Manager::StartDaqAsync
virtual boost::future< State > StartDaqAsync(std::string_view id)=0
Start DaqController identified by id.
daq::Manager::AbortDaqAsync
virtual boost::future< Status > AbortDaqAsync(std::string_view id, ErrorPolicy policy)=0
Abort DaqController identified by id.
daq::Status
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:32
daq::fits::KeywordVector
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:138
daq::Manager::AddDaq
virtual void AddDaq(std::shared_ptr< DaqController > daq)=0
Add data acquisition.
daq::Manager::GetStatusSignal
virtual StatusSignal & GetStatusSignal()=0
daq::Manager
Manager owns DaqController and FitsController (active data acquisitions) instances and multiplexes re...
Definition: manager.hpp:86
daq::ManagerImpl::GetDaqControllers
std::vector< std::shared_ptr< DaqController const > > GetDaqControllers() override
Definition: manager.cpp:227
state.hpp
Declares daq::State and related functions.
daq::ManagerImpl::GetStatus
Status GetStatus(std::string_view id) const override
Get status.
Definition: manager.cpp:114
daq::ErrorPolicy
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
daq::ManagerImpl::UpdateKeywords
void UpdateKeywords(std::string_view id, fits::KeywordVector const &keywords) override
Update FITS keywords for DaqController identified by id.
Definition: manager.cpp:219
daq::DaqController
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
Definition: daqController.hpp:163
daq::MakeDaqIdCandidate
std::string MakeDaqIdCandidate(char const *instrument_id, unsigned jitter=0)
Creates a DAQ id candidate that may or may not be unique.
Definition: manager.cpp:27
utility.hpp
Declaration of utilities.
daq::Manager::GetStatus
virtual Status GetStatus(std::string_view id) const =0
Get status.
daq::Manager::~Manager
virtual ~Manager()
Definition: manager.hpp:88
daq::ManagerImpl::HaveDaq
bool HaveDaq(std::string_view id) const noexcept override
Query existing data acquisition by id.
Definition: manager.cpp:96
daq::Manager::StopDaqAsync
virtual boost::future< Status > StopDaqAsync(std::string_view id, ErrorPolicy policy)=0
Stop DaqController identified by id.
error.hpp
Contains error related declarations for DAQ.