ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
manager.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdaq
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Declaration of `daq::Manager`
7  */
8 #ifndef OCM_DAQ_MANAGER_HPP_
9 #define OCM_DAQ_MANAGER_HPP_
10 #include "config.hpp"
11 
12 #include <chrono>
13 #include <string_view>
14 #include <vector>
15 
16 #include <boost/asio/deadline_timer.hpp>
17 #include <boost/thread/future.hpp>
18 #include <log4cplus/logger.h>
19 #include <rad/ioExecutor.hpp>
20 
21 #include <daq/daqContext.hpp>
22 #include <daq/error.hpp>
23 #include <daq/eventLog.hpp>
24 #include <daq/state.hpp>
25 #include <daq/status.hpp>
26 #include <daq/utility.hpp>
27 
28 namespace daq {
29 
30 class Workspace;
31 
32 /**
33  * Configurations parameters directly related to manager.
34  */
35 struct ManagerParams {
36  /**
37  * Instrument identifier.
38  */
39  std::string instrument_id;
40  std::string origin = "ESO-PARANAL";
41 
42  /**
43  * Age of DAQ in acquiring state after which it is automatically considered abandoned and will
44  * be archived without further action at startup.
45  */
46  std::chrono::hours acquiring_stale_age = std::chrono::hours(14);
47 
48  /**
49  * Age of DAQ in merging state, after which it is automatically considered abandoned and will be
50  * archived without further action at startup.
51  */
52  std::chrono::hours merging_stale_age = std::chrono::hours(2 * 24);
53 };
54 
55 /**
56  * Creates a DAQ id candidate that may or may not be unique.
57  *
58  * @param instrument_id The instrument ID to use for the id. Only the 5 first characters will be
59  * used if the name is longer than 5.
60  * @param jitter Jitter is a millisecond component added to the current time. This is meant to be
61  * used to find a unique id by adding jitter until a unique id is found.
62  *
63  * @note The function cannot guarantee unique ID and is the responsibility of the caller.
64  * @ingroup daq_common_libdaq
65  */
66 std::string MakeIdCandidate(char const* instrument_id,
67  unsigned jitter = 0,
68  std::chrono::system_clock::time_point* out = nullptr);
69 
70 class DaqController;
72 class FitsController;
73 
74 /**
75  * Observes any status.
76  */
77 class StatusSignal {
78 public:
79  using SignalType = boost::signals2::signal<void(ObservableStatus const&)>;
80 
81  template <class Observer>
82  boost::signals2::connection ConnectObserver(Observer o) {
83  return m_signal.connect(std::move(o));
84  }
85 
86  void Signal(ObservableStatus const& status) {
87  m_signal(status);
88  }
89 
90 private:
91  SignalType m_signal;
92 };
93 
94 /**
95  * Manager owns DaqController and FitsController (active data acquisitions) instances and
96  * multiplexes requests to them.
97  *
98  *
99  * Important responsibilities:
100  *
101  * - Provide interface that allows multiplexing of multiple DaqControllers and FitsControllers
102  * (identified by unique id). This interface is close to what the MAL ICD interface looks like to
103  * control DaqControllers and FitsControllers.
104  * - Creation of `DaqController` instance as part of a new data acquisition (using factory)
105  * - Replace `DaqController` with another implementation to perform the merging (triggered when DAQ
106  * reaches Stopped state).
107  * - Monitor primary data sources to stop DaqController if a primary source completes (TODO).
108  * - (TBD): On startup query previous execution for incomplete DAQ (only consider DAQs with a
109  * maximum age to discard abandoned DAQs)
110  * - Load from persistant storage.
111  * - Query DPM for in-progress merges.
112  *
113  * FitsController also act as a data source.
114  *
115  * Out of scope:
116  *
117  * - Creation of `FitsController` instances.
118  *
119  * @note: It's a template only to facilitate mocking of DaqController, without requiring it to be
120  * an interface.
121  *
122  * @ingroup daq_common_libdaq
123  */
124 class Manager {
125 public:
126  virtual ~Manager() {
127  }
128  using Signal = boost::signals2::signal<void(ObservableStatus const&)>;
129 
130  /**
131  * Restore from state stored in workspace.
132  *
133  * This should typically only be done after construction and not when running.
134  */
135  virtual void RestoreFromWorkspace() = 0;
136 
137  /**
138  * Creates a new unique identifier based on the instrument id and current time.
139  *
140  * If there is a id collision when using current time a millisecond jitter component
141  * is added until a unique id is found.
142  *
143  * The format is the same as ARCFILE - the file extension: "<OLAS_ID>-2020-08-19T09:33:11.951"
144  *
145  * @note OLAS_ID is the first 5 characters of instrument id.
146  * @note The returned ID is guaranteed to be unique for all IDs known by Manager (there is
147  * currently no global repository of previous IDs) and is suitable for both DAQ id and OLAS
148  * FileId.
149  *
150  * @param [out] time Optional time used to produce the ID.
151  * @throws std::system_error on failure.
152  */
153  virtual std::string MakeDaqId(std::chrono::system_clock::time_point* time = nullptr) const = 0;
154 
155  /**
156  * Query existing data acquisition by @c id and optional @c file_id.
157  *
158  * @param id DAQ id to look up.
159  * @param file_id Optional file_id to look up.
160  *
161  * @returns true if there is a already a DAQ with the same ID (or same file_id if argument is
162  * provided).
163  * @returns false otherwise.
164  */
165  virtual bool HaveDaq(std::string_view id, std::string_view file_id = {}) const DAQ_NOEXCEPT = 0;
166 
167  /**
168  * Get status
169  *
170  * @throw std::invalid_argument if no data acquisition exist with provided `id`.
171  */
172  virtual Status GetStatus(std::string_view id) const = 0;
173 
174  /**
175  * Start DaqController identified by `id`.
176  *
177  * @param id Data acquisition id.
178  *
179  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
180  * does not exist.
181  * @returns Future that will eventually be ready when data acquisition has started, or failed to
182  * start.
183  */
184  virtual boost::future<State> StartDaqAsync(DaqContext ctx) = 0;
185 
186  /**
187  * Stop DaqController identified by `id`.
188  *
189  * @param id Data acquisition id.
190  * @param policy Error policy determining if errors are tolerated or not.
191  *
192  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
193  * does not exist.
194  * @returns Future that will eventually be ready when data acquisition has stopped, or failed to
195  * stop.
196  */
197  virtual boost::future<Status> StopDaqAsync(std::string_view id, ErrorPolicy policy) = 0;
198 
199  /**
200  * Abort DaqController identified by `id`.
201  *
202  * @param id Data acquisition id.
203  * @param policy Error policy determining if errors are tolerated or not.
204  *
205  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
206  * does not exist.
207  * @returns Future that will eventually be ready when data acquisition has aborted, or failed to
208  * abort.
209  */
210  virtual boost::future<Status> AbortDaqAsync(std::string_view id, ErrorPolicy policy) = 0;
211 
212  /**
213  * Await DAQ state
214  *
215  * @param id Data acquisition id.
216  * @param state target state to await.
217  * @param timeout How long to wait for state to be reached.
218  * @returns Exceptional future containing std::invalid_argument, if data acquisition with `id`
219  * does not exist or timeout is negative.
220  * @returns Future with a value set when condition is fulfilled or times out.
221  */
222  virtual boost::future<Result<Status>>
223  AwaitDaqStateAsync(std::string_view id, State state, std::chrono::milliseconds timeout) = 0;
224  /**
225  * Update FITS keywords for DaqController identified by `id`.
226  * @param id Data acquisition id.
227  * @param keywords FITS keywords to update.
228  *
229  * @throw std::invalid_argument if no data acquisition exist with provided `id`.
230  * @throw std::runtime_error If DaqController state does not allow updating of keywords (it's
231  * e.g. already been submitted for merging).
232  */
233  virtual void UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) = 0;
234 
235  /**
236  * @returns status observer object.
237  *
238  * The status observer observes any DAQ status changes.
239  */
241 
242  /**
243  * @returns current Daq Controllers.
244  */
245  virtual std::vector<std::shared_ptr<DaqController const>> GetDaqControllers() = 0;
246 };
247 
248 /**
249  * Implements `daq::Manager`.
250  *
251  * @ingroup daq_common_libdaq
252  */
253 class ManagerImpl : public Manager {
254 public:
255  /**
256  * @param instrument_id Instrument id.
257  */
258  explicit ManagerImpl(rad::IoExecutor& executor,
259  ManagerParams params,
260  Workspace& workspace,
261  std::shared_ptr<ObservableEventLog> event_log,
262  DaqControllerFactory& daq_factory);
263  ~ManagerImpl() noexcept;
264  /**
265  * Loads status and constructs DaqControllers corresponding to stored state.
266  */
267  void RestoreFromWorkspace() override;
268 
269  std::string MakeDaqId(std::chrono::system_clock::time_point* time = nullptr) const override;
270 
271  bool HaveDaq(std::string_view id, std::string_view file_id = {}) const noexcept override;
272 
273  Status GetStatus(std::string_view id) const override;
274 
275  boost::future<State> StartDaqAsync(DaqContext ctx) override;
276 
277  boost::future<Status> StopDaqAsync(std::string_view id, ErrorPolicy policy) override;
278 
279  boost::future<Status> AbortDaqAsync(std::string_view id, ErrorPolicy policy) override;
280 
281  boost::future<Result<Status>>
282  AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override;
283 
284  void UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) override;
285 
286  StatusSignal& GetStatusSignal() override;
287 
288  std::vector<std::shared_ptr<DaqController const>> GetDaqControllers() override;
289 
290 private:
291  struct Daq {
292  Daq(std::string id_arg,
293  std::shared_ptr<DaqController> controller_arg,
294  boost::signals2::connection conn_status_arg,
295  boost::signals2::connection conn_context_arg) noexcept;
296 
297  std::string id;
298  std::shared_ptr<DaqController> controller;
299  // connection for observer connected to controller.
300  boost::signals2::scoped_connection conn_status;
301  boost::signals2::scoped_connection conn_context;
302  };
303 
304  /// Adds unique identifier (unrelated to DAQ id) to op abort functions
305  class OpAbortFunc {
306  public:
307  using Func = std::function<bool()>;
308  OpAbortFunc(Func&& f);
309  OpAbortFunc(OpAbortFunc&&) = default;
310  OpAbortFunc& operator=(OpAbortFunc&&) = default;
311 
312  uint64_t GetId() const noexcept;
313  bool Abort() noexcept;
314 
315  private:
316  static uint64_t NextId();
317  uint64_t m_id;
318  std::function<bool()> m_func;
319  };
320 
321  enum class Store { Yes, No };
322  /**
323  * Adds initial keywords that can be provided by manager.
324  *
325  * - `ORIGIN`
326  * - `INSTRUME`
327  * - `ARCFILE`
328  */
329  void AddInitialKeywords(DaqContext& ctx);
330 
331  /**
332  * Adds DaqController to active set and starts monitoring status changes.
333  *
334  * @param daq Data acquisition to add.
335  * @param store Determines if workspace should be updated or not (normally yes but if DAQ is
336  * added from workspace it does not have to be updated immediately again).
337  */
338  void AddDaq(std::shared_ptr<DaqController> daq, Store store = Store::Yes);
339 
340  /** @name State persistence functions */
341  /// @{
342  /**
343  * Remove daq
344  * @param id DAQ to remove.
345  */
346  void RemoveDaq(std::string_view id);
347  void ArchiveDaq(std::string const& id);
348  /**
349  * Store list of active daqs in workspace.
350  */
351  void StoreActiveDaqs() const;
352  /// @}
353 
354  void RemoveAbortFunc(uint64_t id) noexcept;
355  DaqController const* FindDaq(std::string_view id) const noexcept;
356  DaqController* FindDaq(std::string_view id) noexcept;
357  DaqController& FindDaqOrThrow(std::string_view id);
358  DaqController const& FindDaqOrThrow(std::string_view id) const;
359  /**
360  * Invoked by Manager when a DAQ has entered Stopped state and DaqController implementation is
361  * changed to the DPM phase version.
362  */
363  void MoveToMergePhase(std::string_view id);
364 
365  /**
366  * Iterates data acquisitions and for those in State::NotStarted
367  *
368  * It sends pending requests and returns immediately. Upon failing the completion handler will
369  * schedule a 60s deadline timer to retry.
370  */
371  void ScheduleDaqsAsync();
372 
373  std::shared_ptr<bool> m_alive_token;
374  rad::IoExecutor& m_executor;
375  ManagerParams m_params;
376  Workspace& m_workspace;
377  std::shared_ptr<ObservableEventLog> m_event_log;
378  DaqControllerFactory& m_daq_factory;
379 
380  StatusSignal m_status_signal;
381  /// Data acquisitions
382  std::vector<Daq> m_daq_controllers;
383 
384  /// Pair of DAQ-id and abort function
385  std::vector<OpAbortFunc> m_abort_funcs;
386  log4cplus::Logger m_logger;
387 
388  // Used by ScheduleDaqsAsync to retry schedule merges when DPM is offline
389  std::optional<boost::asio::deadline_timer> m_schedule_retry;
390 };
391 
392 } // namespace daq
393 #endif // #ifndef OCM_DAQ_MANAGER_HPP_
daq::StatusSignal::SignalType
boost::signals2::signal< void(ObservableStatus const &)> SignalType
Definition: manager.hpp:79
daq::ManagerImpl::StartDaqAsync
boost::future< State > StartDaqAsync(DaqContext ctx) override
Start DaqController identified by id.
Definition: manager.cpp:432
daq::State
State
Observable states of the data acquisition process.
Definition: state.hpp:39
daq::ManagerImpl::HaveDaq
bool HaveDaq(std::string_view id, std::string_view file_id={}) const noexcept override
Query existing data acquisition by id and optional file_id.
Definition: manager.cpp:205
daq::Manager::UpdateKeywords
virtual void UpdateKeywords(std::string_view id, fits::KeywordVector const &keywords)=0
Update FITS keywords for DaqController identified by id.
ioExecutor.hpp
daq::ManagerParams::instrument_id
std::string instrument_id
Instrument identifier.
Definition: manager.hpp:39
daq::Workspace
Interface to interact with DPM workspace.
Definition: workspace.hpp:31
daq::StatusSignal::Signal
void Signal(ObservableStatus const &status)
Definition: manager.hpp:86
daq::ManagerImpl
Implements daq::Manager.
Definition: manager.hpp:253
rad::IoExecutor
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
daq::Manager::HaveDaq
virtual bool HaveDaq(std::string_view id, std::string_view file_id={}) const DAQ_NOEXCEPT=0
Query existing data acquisition by id and optional file_id.
daq::ManagerParams::origin
std::string origin
Definition: manager.hpp:40
DAQ_NOEXCEPT
#define DAQ_NOEXCEPT
Definition: config.hpp:16
daq::ManagerImpl::UpdateKeywords
void UpdateKeywords(std::string_view id, fits::KeywordVector const &keywords) override
Update FITS keywords for DaqController identified by id.
Definition: manager.cpp:518
daq::Manager::GetStatusSignal
virtual StatusSignal & GetStatusSignal()=0
daq::MakeIdCandidate
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
Definition: manager.cpp:46
daq::ManagerImpl::StopDaqAsync
boost::future< Status > StopDaqAsync(std::string_view id, ErrorPolicy policy) override
Stop DaqController identified by id.
Definition: manager.cpp:464
daq::Manager::AwaitDaqStateAsync
virtual boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State state, std::chrono::milliseconds timeout)=0
Await DAQ state.
daq
Definition: asyncProcess.cpp:15
daq::StatusSignal::ConnectObserver
boost::signals2::connection ConnectObserver(Observer o)
Definition: manager.hpp:82
daq::ObservableStatus
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:161
daq::StatusSignal
Observes any status.
Definition: manager.hpp:77
eventLog.hpp
Contains declaration for EventLog, ObservableEventLog and related events.
config.hpp
Config class header file.
daq::DaqContext
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:48
daq::Manager::~Manager
virtual ~Manager()
Definition: manager.hpp:126
daq::Manager::MakeDaqId
virtual std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const =0
Creates a new unique identifier based on the instrument id and current time.
daq::ManagerImpl::~ManagerImpl
~ManagerImpl() noexcept
Definition: manager.cpp:109
daq::Manager::StopDaqAsync
virtual boost::future< Status > StopDaqAsync(std::string_view id, ErrorPolicy policy)=0
Stop DaqController identified by id.
daq::Manager::StartDaqAsync
virtual boost::future< State > StartDaqAsync(DaqContext ctx)=0
Start DaqController identified by id.
daq::ManagerParams::merging_stale_age
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
Definition: manager.hpp:52
daq::Manager::GetDaqControllers
virtual std::vector< std::shared_ptr< DaqController const > > GetDaqControllers()=0
status.hpp
Contains declaration for Status and ObservableStatus.
daq::ManagerImpl::GetDaqControllers
std::vector< std::shared_ptr< DaqController const > > GetDaqControllers() override
Definition: manager.cpp:526
daqContext.hpp
Contains declaration of daq::Context.
daq::ManagerImpl::MakeDaqId
std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const override
Creates a new unique identifier based on the instrument id and current time.
Definition: manager.cpp:196
daq::ManagerImpl::AwaitDaqStateAsync
boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override
Await DAQ state.
Definition: manager.cpp:480
daq::ManagerParams::acquiring_stale_age
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Definition: manager.hpp:46
state.hpp
Declares daq::State and related functions.
daq::ManagerImpl::AbortDaqAsync
boost::future< Status > AbortDaqAsync(std::string_view id, ErrorPolicy policy) override
Abort DaqController identified by id.
Definition: manager.cpp:472
daq::ManagerImpl::GetStatus
Status GetStatus(std::string_view id) const override
Get status.
Definition: manager.cpp:366
daq::Status
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:120
daq::ManagerImpl::RestoreFromWorkspace
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
Definition: manager.cpp:124
daq::Manager::RestoreFromWorkspace
virtual void RestoreFromWorkspace()=0
Restore from state stored in workspace.
daq::fits::KeywordVector
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
daq::ManagerImpl::GetStatusSignal
StatusSignal & GetStatusSignal() override
Definition: manager.cpp:522
daq::ManagerParams
Configurations parameters directly related to manager.
Definition: manager.hpp:35
daq::DaqController
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
Definition: daqController.hpp:214
daq::Manager::Signal
boost::signals2::signal< void(ObservableStatus const &)> Signal
Definition: manager.hpp:128
daq::ErrorPolicy
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
daq::Manager::GetStatus
virtual Status GetStatus(std::string_view id) const =0
Get status.
utility.hpp
Declaration of utilities.
daq::Manager
Manager owns DaqController and FitsController (active data acquisitions) instances and multiplexes re...
Definition: manager.hpp:124
error.hpp
Contains error related declarations for DAQ.
daq::ManagerImpl::ManagerImpl
ManagerImpl(rad::IoExecutor &executor, ManagerParams params, Workspace &workspace, std::shared_ptr< ObservableEventLog > event_log, DaqControllerFactory &daq_factory)
Definition: manager.cpp:95
daq::DaqControllerFactory
Abstract factory for DaqControllers.
Definition: daqController.hpp:76
daq::Manager::AbortDaqAsync
virtual boost::future< Status > AbortDaqAsync(std::string_view id, ErrorPolicy policy)=0
Abort DaqController identified by id.