ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
manager.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdaq
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Declaration of `daq::Manager`
7  */
8 #ifndef OCM_DAQ_MANAGER_HPP_
9 #define OCM_DAQ_MANAGER_HPP_
10 #include "config.hpp"
11 
12 #include <chrono>
13 #include <string_view>
14 #include <vector>
15 
16 #include <boost/asio/deadline_timer.hpp>
17 #include <boost/thread/future.hpp>
18 #include <log4cplus/logger.h>
19 #include <rad/ioExecutor.hpp>
20 
21 #include <daq/daqContext.hpp>
22 #include <daq/error.hpp>
23 #include <daq/eventLog.hpp>
24 #include <daq/state.hpp>
25 #include <daq/status.hpp>
26 #include <daq/utility.hpp>
27 
28 namespace daq {
29 
30 class Workspace;
31 
32 /**
33  * Configurations parameters directly related to manager.
34  */
35 struct ManagerParams {
36  /**
37  * Instrument identifier.
38  */
39  std::string instrument_id;
40  std::string origin = "ESO-PARANAL";
41 
42  /**
43  * Age of DAQ in acquiring state after which it is automatically considered abandoned and will
44  * be archived without further action at startup.
45  */
46  std::chrono::hours acquiring_stale_age = std::chrono::hours(14);
47 
48  /**
49  * Age of DAQ in merging state, after which it is automatically considered abandoned and will be
50  * archived without further action at startup.
51  */
52  std::chrono::hours merging_stale_age = std::chrono::hours(2 * 24);
53 };
54 
55 /**
56  * Creates a DAQ id candidate that may or may not be unique.
57  *
58  * @param instrument_id The instrument ID to use for the id. Only the 5 first characters will be
59  * used if the name is longer than 5.
60  * @param jitter Jitter is a millisecond component added to the current time. This is meant to be
61  * used to find a unique id by adding jitter until a unique id is found.
62  *
63  * @note The function cannot guarantee unique ID and is the responsibility of the caller.
64  * @ingroup daq_common_libdaq
65  */
66 std::string MakeIdCandidate(char const* instrument_id,
67  unsigned jitter = 0,
68  std::chrono::system_clock::time_point* out = nullptr);
69 
70 class DaqController;
72 class FitsController;
73 
74 /**
75  * Observes any status.
76  */
77 class StatusSignal {
78 public:
79  using SignalType = boost::signals2::signal<void(ObservableStatus const&)>;
80 
81  template <class Observer>
82  boost::signals2::connection ConnectObserver(Observer o) {
83  return m_signal.connect(std::move(o));
84  }
85 
86  void Signal(ObservableStatus const& status) {
87  m_signal(status);
88  }
89 
90 private:
91  SignalType m_signal;
92 };
93 
94 /**
95  * Manager owns DaqController and FitsController (active data acquisitions) instances and
96  * multiplexes requests to them.
97  *
98  *
99  * Important responsibilities:
100  *
101  * - Provide interface that allows multiplexing of multiple DaqControllers and FitsControllers
102  * (identified by unique id). This interface is close to what the MAL ICD interface looks like to
103  * control DaqControllers and FitsControllers.
104  * - Creation of `DaqController` instance as part of a new data acquisition (using factory)
105  * - Replace `DaqController` with another implementation to perform the merging (triggered when DAQ
106  * reaches Stopped state).
107  * - Monitor primary data sources to stop DaqController if a primary source completes (TODO).
108  * - (TBD): On startup query previous execution for incomplete DAQ (only consider DAQs with a
109  * maximum age to discard abandoned DAQs)
110  * - Load from persistant storage.
111  * - Query DPM for in-progress merges.
112  *
113  * FitsController also act as a data source.
114  *
115  * Out of scope:
116  *
117  * - Creation of `FitsController` instances.
118  *
119  * @note: It's a template only to facilitate mocking of DaqController, without requiring it to be
120  * an interface.
121  *
122  * @ingroup daq_common_libdaq
123  */
124 class Manager {
125 public:
126  virtual ~Manager() {
127  }
128  using Signal = boost::signals2::signal<void(ObservableStatus const&)>;
129 
130  /**
131  * Restore from state stored in workspace.
132  *
133  * This should typically only be done after construction and not when running.
134  */
135  virtual void RestoreFromWorkspace() = 0;
136 
137  /**
138  * Creates a new unique identifier based on the instrument id and current time.
139  *
140  * If there is a id collision when using current time a millisecond jitter component
141  * is added until a unique id is found.
142  *
143  * The format is the same as ARCFILE - the file extension: "<OLAS_ID>-2020-08-19T09:33:11.951"
144  *
145  * @note OLAS_ID is the first 5 characters of instrument id.
146  * @note The returned ID is guaranteed to be unique for all IDs known by Manager (there is
147  * currently no global repository of previous IDs) and is suitable for both DAQ id and OLAS
148  * FileId.
149  *
150  * @param [out] time Optional time used to produce the ID.
151  * @throws std::system_error on failure.
152  */
153  virtual std::string MakeDaqId(std::chrono::system_clock::time_point* time = nullptr) const = 0;
154 
155  /**
156  * Query existing data acquisition by @c id and optional @c file_id.
157  *
158  * @param id DAQ id to look up.
159  * @param file_id Optional file_id to look up.
160  *
161  * @returns true if there is a already a DAQ with the same ID (or same file_id if argument is
162  * provided).
163  * @returns false otherwise.
164  */
165  virtual bool HaveDaq(std::string_view id, std::string_view file_id = {}) const DAQ_NOEXCEPT = 0;
166 
167  /**
168  * Get status
169  *
170  * @throw std::invalid_argument if no data acquisition exist with provided `id`.
171  */
172  virtual Status GetStatus(std::string_view id) const = 0;
173 
174  /**
175  * Start DaqController identified by `id`.
176  *
177  * @param id Data acquisition id.
178  *
179  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
180  * does not exist.
181  * @returns Future that will eventually be ready when data acquisition has started, or failed to
182  * start.
183  */
184  virtual boost::future<State> StartDaqAsync(DaqContext ctx) = 0;
185 
186  /**
187  * Stop DaqController identified by `id`.
188  *
189  * @param id Data acquisition id.
190  * @param policy Error policy determining if errors are tolerated or not.
191  *
192  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
193  * does not exist.
194  * @returns Future that will eventually be ready when data acquisition has stopped, or failed to
195  * stop.
196  */
197  virtual boost::future<Status> StopDaqAsync(std::string_view id, ErrorPolicy policy) = 0;
198 
199  /**
200  * Abort DaqController identified by `id`.
201  *
202  * @param id Data acquisition id.
203  * @param policy Error policy determining if errors are tolerated or not.
204  *
205  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
206  * does not exist.
207  * @returns Future that will eventually be ready when data acquisition has aborted, or failed to
208  * abort.
209  */
210  virtual boost::future<Status> AbortDaqAsync(std::string_view id, ErrorPolicy policy) = 0;
211 
212  /**
213  * Await DAQ state
214  *
215  * @param id Data acquisition id.
216  * @param state target state to await.
217  * @param timeout How long to wait for state to be reached.
218  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
219  * does not exist or timeout is negative.
220  * @returns Future with a value set when condition is fulfilled or times out.
221  */
222  virtual boost::future<Result<Status>>
223  AwaitDaqStateAsync(std::string_view id, State state, std::chrono::milliseconds timeout) = 0;
224  /**
225  * Update FITS keywords for DaqController identified by `id`.
226  * @param id Data acquisition id.
227  * @param keywords FITS keywords to update.
228  *
229  * @throw std::invalid_argument if no data acquisition exist with provided `id`.
230  * @throw std::runtime_error If DaqController state does not allow updating of keywords (it's
231  * e.g. already been submitted for merging).
232  */
233  virtual void UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) = 0;
234 
235  /**
236  * @returns status observer object.
237  *
238  * The status observer observes any DAQ status changes.
239  */
241 
242  /**
243  * @returns current Daq Controllers.
244  */
245  virtual std::vector<std::shared_ptr<DaqController const>> GetDaqControllers() = 0;
246 };
247 
248 /**
249  * Implements `daq::Manager`.
250  *
251  * @ingroup daq_common_libdaq
252  */
253 class ManagerImpl : public Manager {
254 public:
255  /**
256  * @param instrument_id Instrument id.
257  */
258  explicit ManagerImpl(rad::IoExecutor& executor,
259  ManagerParams params,
260  Workspace& workspace,
261  std::shared_ptr<ObservableEventLog> event_log,
262  DaqControllerFactory& daq_factory,
263  log4cplus::Logger const& logger);
264  ~ManagerImpl() noexcept;
265  /**
266  * Loads status and constructs DaqControllers corresponding to stored state.
267  */
268  void RestoreFromWorkspace() override;
269 
270  std::string MakeDaqId(std::chrono::system_clock::time_point* time = nullptr) const override;
271 
272  bool HaveDaq(std::string_view id, std::string_view file_id = {}) const noexcept override;
273 
274  Status GetStatus(std::string_view id) const override;
275 
276  boost::future<State> StartDaqAsync(DaqContext ctx) override;
277 
278  boost::future<Status> StopDaqAsync(std::string_view id, ErrorPolicy policy) override;
279 
280  boost::future<Status> AbortDaqAsync(std::string_view id, ErrorPolicy policy) override;
281 
282  boost::future<Result<Status>>
283  AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override;
284 
285  void UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) override;
286 
287  StatusSignal& GetStatusSignal() override;
288 
289  std::vector<std::shared_ptr<DaqController const>> GetDaqControllers() override;
290 
291 private:
292  struct Daq {
293  Daq(std::string id_arg,
294  std::shared_ptr<DaqController> controller_arg,
295  boost::signals2::connection conn_status_arg,
296  boost::signals2::connection conn_context_arg) noexcept;
297 
298  std::string id;
299  std::shared_ptr<DaqController> controller;
300  // connection for observer connected to controller.
301  boost::signals2::scoped_connection conn_status;
302  boost::signals2::scoped_connection conn_context;
303  };
304 
305  /// Adds unique identifier (unrelated to DAQ id) to op abort functions
306  class OpAbortFunc {
307  public:
308  using Func = std::function<bool()>;
309  OpAbortFunc(Func&& f);
310  OpAbortFunc(OpAbortFunc&&) = default;
311  OpAbortFunc& operator=(OpAbortFunc&&) = default;
312 
313  uint64_t GetId() const noexcept;
314  bool Abort() noexcept;
315 
316  private:
317  static uint64_t NextId();
318  uint64_t m_id;
319  std::function<bool()> m_func;
320  };
321 
322  enum class Store { Yes, No };
323  /**
324  * Adds initial keywords that can be provided by manager.
325  *
326  * - `ORIGIN`
327  * - `INSTRUME`
328  * - `ARCFILE`
329  */
330  void AddInitialKeywords(DaqContext& ctx);
331 
332  /**
333  * Adds DaqController to active set and starts monitoring status changes.
334  *
335  * @param daq Data acquisition to add.
336  * @param store Determines if workspace should be updated or not (normally yes but if DAQ is
337  * added from workspace it does not have to be updated immediately again).
338  */
339  void AddDaq(std::shared_ptr<DaqController> const& daq, Store store = Store::Yes);
340 
341  /** @name State persistence functions */
342  /// @{
343  /**
344  * Remove daq
345  * @param id DAQ to remove.
346  */
347  void RemoveDaq(std::string_view id);
348  void ArchiveDaq(std::string const& id);
349  /**
350  * Store list of active daqs in workspace.
351  */
352  void StoreActiveDaqs() const;
353  /// @}
354 
355  void RemoveAbortFunc(uint64_t id) noexcept;
356  DaqController const* FindDaq(std::string_view id) const noexcept;
357  DaqController* FindDaq(std::string_view id) noexcept;
358  DaqController& FindDaqOrThrow(std::string_view id);
359  DaqController const& FindDaqOrThrow(std::string_view id) const;
360  /**
361  * Invoked by Manager when a DAQ has entered Stopped state and DaqController implementation is
362  * changed to the DPM phase version.
363  */
364  void MoveToMergePhase(std::string_view id);
365 
366  /**
367  * Iterates data acquisitions and for those in State::NotStarted
368  *
369  * It sends pending requests and returns immediately. Upon failing the completion handler will
370  * schedule a 60s deadline timer to retry.
371  */
372  void ScheduleDaqsAsync();
373 
374  std::shared_ptr<bool> m_alive_token;
375  rad::IoExecutor& m_executor;
376  ManagerParams m_params;
377  Workspace& m_workspace;
378  std::shared_ptr<ObservableEventLog> m_event_log;
379  DaqControllerFactory& m_daq_factory;
380 
381  StatusSignal m_status_signal;
382  /// Data acquisitions
383  std::vector<Daq> m_daq_controllers;
384 
385  /// Pair of DAQ-id and abort function
386  std::vector<OpAbortFunc> m_abort_funcs;
387  log4cplus::Logger m_logger;
388 
389  // Used by ScheduleDaqsAsync to retry schedule merges when DPM is offline
390  std::optional<boost::asio::deadline_timer> m_schedule_retry;
391 };
392 
393 } // namespace daq
394 #endif // #ifndef OCM_DAQ_MANAGER_HPP_
Abstract factory for DaqControllers.
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
Implements daq::Manager.
Definition: manager.hpp:253
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
Definition: manager.cpp:125
Status GetStatus(std::string_view id) const override
Get status.
Definition: manager.cpp:367
boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override
Await DAQ state.
Definition: manager.cpp:485
bool HaveDaq(std::string_view id, std::string_view file_id={}) const noexcept override
Query existing data acquisition by id and optional file_id.
Definition: manager.cpp:206
ManagerImpl(rad::IoExecutor &executor, ManagerParams params, Workspace &workspace, std::shared_ptr< ObservableEventLog > event_log, DaqControllerFactory &daq_factory, log4cplus::Logger const &logger)
Definition: manager.cpp:95
std::vector< std::shared_ptr< DaqController const > > GetDaqControllers() override
Definition: manager.cpp:533
~ManagerImpl() noexcept
Definition: manager.cpp:110
std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const override
Creates a new unique identifier based on the instrument id and current time.
Definition: manager.cpp:197
void UpdateKeywords(std::string_view id, fits::KeywordVector const &keywords) override
Update FITS keywords for DaqController identified by id.
Definition: manager.cpp:525
boost::future< State > StartDaqAsync(DaqContext ctx) override
Start DaqController identified by id.
Definition: manager.cpp:433
boost::future< Status > StopDaqAsync(std::string_view id, ErrorPolicy policy) override
Stop DaqController identified by id.
Definition: manager.cpp:469
boost::future< Status > AbortDaqAsync(std::string_view id, ErrorPolicy policy) override
Abort DaqController identified by id.
Definition: manager.cpp:477
StatusSignal & GetStatusSignal() override
Definition: manager.cpp:529
Manager owns DaqController and FitsController (active data acquisitions) instances and multiplexes re...
Definition: manager.hpp:124
virtual boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State state, std::chrono::milliseconds timeout)=0
Await DAQ state.
boost::signals2::signal< void(ObservableStatus const &)> Signal
Definition: manager.hpp:128
virtual boost::future< State > StartDaqAsync(DaqContext ctx)=0
Start DaqController identified by id.
virtual StatusSignal & GetStatusSignal()=0
virtual boost::future< Status > StopDaqAsync(std::string_view id, ErrorPolicy policy)=0
Stop DaqController identified by id.
virtual Status GetStatus(std::string_view id) const =0
Get status.
virtual void UpdateKeywords(std::string_view id, fits::KeywordVector const &keywords)=0
Update FITS keywords for DaqController identified by id.
virtual std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const =0
Creates a new unique identifier based on the instrument id and current time.
virtual void RestoreFromWorkspace()=0
Restore from state stored in workspace.
virtual ~Manager()
Definition: manager.hpp:126
virtual bool HaveDaq(std::string_view id, std::string_view file_id={}) const DAQ_NOEXCEPT=0
Query existing data acquisition by id and optional file_id.
virtual boost::future< Status > AbortDaqAsync(std::string_view id, ErrorPolicy policy)=0
Abort DaqController identified by id.
virtual std::vector< std::shared_ptr< DaqController const > > GetDaqControllers()=0
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:165
Observes any status.
Definition: manager.hpp:77
boost::signals2::connection ConnectObserver(Observer o)
Definition: manager.hpp:82
boost::signals2::signal< void(ObservableStatus const &)> SignalType
Definition: manager.hpp:79
void Signal(ObservableStatus const &status)
Definition: manager.hpp:86
Interface to interact with DPM workspace.
Definition: workspace.hpp:31
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
#define DAQ_NOEXCEPT
Definition: config.hpp:16
Contains declaration of daq::Context.
Contains error related declarations for DAQ.
Contains declaration for EventLog, ObservableEventLog and related events.
Declares daq::State and related functions.
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
std::string origin
Definition: manager.hpp:40
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
Definition: manager.cpp:46
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
Definition: manager.hpp:52
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
std::string instrument_id
Instrument identifier.
Definition: manager.hpp:39
State
Observable states of the data acquisition process.
Definition: state.hpp:39
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Definition: manager.hpp:46
Configurations parameters directly related to manager.
Definition: manager.hpp:35
Config class header file.
Contains declaration for Status and ObservableStatus.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:124
Declaration of utilities.