ifw-daq  1.0.0
IFW Data Acquisition modules
daqController.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq
4  * @copyright 2021 ESO - European Southern Observatory
5  *
6  * @brief Contains declaration for for DaqController
7  */
8 #ifndef OCF_DAQ_DAQ_CONTROLLER_HPP_
9 #define OCF_DAQ_DAQ_CONTROLLER_HPP_
10 #include "config.hpp"
11 
12 #include <chrono>
13 #include <iostream>
14 #include <string>
15 #include <utility>
16 #include <variant>
17 #include <vector>
18 
19 #include <boost/thread/future.hpp>
20 #include <boost/asio/io_context.hpp>
21 #include <boost/asio/steady_timer.hpp>
22 #include <log4cplus/logger.h>
23 #include <gsl/pointers>
24 #include <rad/ioExecutor.hpp>
25 #include <Metadaqif.hpp>
26 
27 #include "fitsController.hpp"
28 #include "error.hpp"
29 #include "source.hpp"
30 #include "state.hpp"
31 #include "status.hpp"
32 #include "eventLog.hpp"
33 #include "pendingReplies.hpp"
34 #include "daqProperties.hpp"
35 #include "dpPart.hpp"
36 #include "op/initiate.hpp"
37 #include "op/asyncOpParams.hpp"
38 
39 namespace daq {
40 
41 /**
42  * Async operations
43  *
44  * @ingroup daq_ocm_libdaq
45  */
47  // Await returns a pair of future return value and an abort function.
48  using AwaitReturnType = std::pair<boost::future<Result<DpParts>>,
49  std::function<bool()>>;
50 
51  /**
52  * Default constructs object with standard async operations.
53  */
56  AsyncOperations(AsyncOperations const&) = default;
59 
60  bool IsValid() const noexcept;
61 
62  std::function<boost::future<void>(op::AsyncOpParams)> start;
63  std::function<boost::future<Result<void>>(ErrorPolicy, op::AsyncOpParams)> abort;
64  std::function<boost::future<Result<DpParts>>(ErrorPolicy, op::AsyncOpParams)> stop;
65  std::function<AwaitReturnType(op::AwaitOpParams)> await_prim;
66 };
67 
68 /**
69  * Controls the execution of single data acquisition that ultimately result in a set of FITS
70  * keywords and/or FITS files.
71  *
72  * Important
73  * ---------
74  *
75  * Due to the dynamic nature of data acquisitions (mostly w.r.t. fatal errors from data soures and
76  * commands from user) the following assumptions are made:
77  *
78  * - Each data source completes only *once*.
79  * - This means that any command that deals with potential completion (mainly the stop and await
80  * operations) *must* record the outcome (DpParts + Keywords) and mark the source state as
81  * completed.
82  * - This ensures that no outcome is lost and that subsequent commands does not include completed
83  * sources.
84  * - Conflicting commands are resolved in the order they are received and w.r.t. to state.
85  * - If data sources are awaited on, some of which has aleady completed, and DaqController is
86  * asked to abort, this means that the DAQ state is `Aborting` and even if the await operation
87  * completes successfully, the result will be discarded because user asked to abort.
88  * - If user asks to stop and primary soures are awaited on, there's a race between which request
89  * to data source completes first.
90  * - Commands are only sent to sources that are in-progress (not completed/aborted/stopped).
91  * - Whichever reply is received first (from await or stop) will determine the result from
92  * that data souce.
93  * - Async operations must be able to deal with partial failures.
94  * - This is required to allow user to retry operation.
95  * - Asynchronous errors are published and user response is requied.
96  * - If e.g. the await operation fails partially it means user must intervene to decide if it
97  * should try to stop or abort the DAQ (in the future a DAQ-wide policy could be set to decide
98  * what to do automatically).
99  *
100  * Usage
101  * -----
102  *
103  * 1. Starting
104  *
105  * Start with `StartAsync`
106  *
107  * 2. Stopping
108  *
109  * Manual option
110  * -------------
111  *
112  * Stop/abort with `StopAsync` or `AbortAsync`.
113  *
114  * Automatic option
115  * ----------------
116  *
117  * Automatically initiate stop if all primary sources are automatically stopped as
118  * indicated by `ObservableStatus` status update topic.
119  *
120  * If asynchronous operations have been started, it is still possible to abort.
121  * This command will then supersede ongoing commands, and ongoing commands will
122  * be aborted at the next synchronization point (typically when reply is received).
123  *
124  * It is not possible to retry or go backwards by executing `StartAsync` after
125  * `StopAsync` or `AbortAsync`. The reasoning behind this is to avoid the risk
126  * of duplicate data acquisition id being "in flight" at the same time..
127  *
128  * It is possible to abort on the other hand, even though the data acquisition might not have
129  * been started yet.
130  *
131  * For StartAsync, StopAsync, AbortAsync:
132  *
133  * The error handling strategy is to not Set result until we have a result
134  * from each source either by reply or internal error when Sending/receiving
135  * (MAL issues e.g.).
136  *
137  * This implies:
138  *
139  * - That way we can communicate the full result using the promise.
140  * On errors it contains the exception(s) on success the new state.
141  * - We can have a mixed result of partial success and partial failure,
142  * although the future will contain exception(s).
143  * - We let caller decide what to do (e.g. abort).
144  * - We use an error flag to indicate that error occurred, which is
145  * possible in any state.
146  * - We stay in the state that had the error to e.g. allow retries.
147  * So if an error occurred in `Starting` then DaqController remains
148  * in `Starting` with the error flag Set.
149  *
150  * Pending decision decisions:
151  * - Whether to fail fast or be robust,
152  * or even whether to be configurable.
153  * - This was addressed with the ErrorPolicy option when stopping/aborting.
154  * - Support aborting while there are still replies pending for startDaq?
155  * This would mean that one can be in state Starting and Stopping at the same time.
156  * - This was addressed to ignore errors when forcibly aborting.
157  * - Who connects the clients + configure QoS? Since connect is done implicitly there's
158  * no need actually. Simply specify timeout should be sufficient. And that should
159  * be done by the caller.
160  *
161  * @ingroup daq_ocm_libdaq
162  */
163 class DaqController : public std::enable_shared_from_this<DaqController> {
164  public:
165  DaqController() = default;
166  virtual ~DaqController() = default;
167  /**
168  * Starts the data acquisition.
169  *
170  * @returns A future that will be Set once data acquisition has started
171  * or or fails.
172  *
173  * @throws std::exception-derived exception if internal error occurs.
174  *
175  * @return Future that is ready once all sources are started or failed.
176  *
177  * @pre `GetState() == State::Notstarted`
178  * @post `GetState() == State::Starting` on success
179  * `GetState() == State::Error` on error
180  */
181  virtual boost::future<State> StartAsync() = 0;
182 
183  /**
184  * Stops the data acquisition.
185  *
186  * @pre `GetState() not in (State::Stopped or State::Aborted)`
187  * @post `GetState() == State::Stopping`
188  */
189  virtual boost::future<Status> StopAsync(ErrorPolicy policy) = 0;
190 
191  /**
192  * Aborts the data acquisition.
193  *
194  * @param policy Error policy determining if errors are tolerated or not.
195  *
196  * It is possible to issue this request more than once, to e.g. retry a failed abort attempt.
197  *
198  * @pre `GetState() not in (State::Aborted, State::Stopped)`
199  * @post `GetState() == State::Aborting` if a data acquisition was ongoing otherwise `GetState()
200  * == State::Aborted`.
201  */
202  virtual boost::future<Status> AbortAsync(ErrorPolicy policy) = 0;
203 
204  /**
205  * Updates (replace or add) list of keywords.
206  *
207  * @param keywords Keywords to add.
208  */
209  virtual void UpdateKeywords(fits::KeywordVector const& keywords) = 0;
210 
211  /** Awaits that data acquisition stops or aborts.
212  *
213  * It is possible to await only only a subset of data sources by specifying their ids in
214  * `sources`.
215  *
216  * @param sources An optional vector of source-ids to await, if empty all primary sources are
217  * awaited on.
218  *
219  * @returns future set with std::invalid_argument if source-id is not recognized.
220  * @returns future set with boost::broken_promise if DaqController is destroyed before
221  * operation completes.
222  */
223  virtual boost::future<State>
224  AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout) = 0;
225 
226  /**
227  * @returns state of data acquisition.
228  */
229  virtual State GetState() const DAQ_NOEXCEPT = 0;
230 
231  /**
232  * @returns status object associated with this daq.
233  */
234  virtual std::shared_ptr<ObservableStatus> GetStatus() DAQ_NOEXCEPT = 0;
235  virtual std::shared_ptr<ObservableStatus const> GetStatus() const DAQ_NOEXCEPT = 0;
236 
237  /**
238  * @returns event log associated with this daq.
239  */
240  virtual std::shared_ptr<ObservableEventLog> GetEventLog() DAQ_NOEXCEPT = 0;
241 
242  /**
243  * @returns data acquisition identifier.
244  */
245  virtual std::string const& GetId() const DAQ_NOEXCEPT = 0;
246 
247  /**
248  * @return Error flag.
249  */
250  virtual bool GetErrorFlag() const DAQ_NOEXCEPT = 0;
251 };
252 
253 /**
254  * Implements `daq::DaqController`
255  *
256  *
257  * @ingroup daq_ocm_libdaq
258  */
260  public:
261  /**
262  * Construct object.
263  *
264  * @param io_context Executor used for continuations and timer.
265  * @param properties General properties used to control DAQ execution.
266  * @param status Data acquisition status object, also contains identifier (may not be empty).
267  * Caller is responsible for making the id unique.
268  * @param prim Primary data sources
269  * @param meta Metadata sources
270  *
271  * @pre `status == true`.
272  * @pre `event_log == true`.
273  * @throws std::invalid_argument if arguments are invalid.
274  */
275  static std::shared_ptr<DaqControllerImpl> Create(boost::asio::io_context& io_context,
276  DaqProperties properties,
277  std::shared_ptr<ObservableStatus> status,
278  std::shared_ptr<ObservableEventLog> event_log,
279  AsyncOperations operations);
280 
281  boost::future<State> StartAsync() override;
282  boost::future<Status> StopAsync(ErrorPolicy policy) override;
283  boost::future<Status> AbortAsync(ErrorPolicy policy) override;
284  void UpdateKeywords(fits::KeywordVector const& keywords) override;
285  boost::future<State>
286  AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout) override;
287  State GetState() const DAQ_NOEXCEPT override ;
288  std::shared_ptr<ObservableStatus> GetStatus() DAQ_NOEXCEPT override;
289  std::shared_ptr<ObservableStatus const> GetStatus() const DAQ_NOEXCEPT override;
290  std::shared_ptr<ObservableEventLog> GetEventLog() DAQ_NOEXCEPT override;
291  std::string const& GetId() const DAQ_NOEXCEPT override;
292  bool GetErrorFlag() const DAQ_NOEXCEPT override;
293 
294  /**
295  * @returns Logger associated with this DaqController.
296  */
297  constexpr log4cplus::Logger const& GetLogger() const noexcept;
298 
299  protected:
300  struct NotStarted{};
301  struct Starting{};
302  struct Acquiring{};
303  struct Stopping{};
304  struct Stopped{};
305  struct Aborting{};
306  struct Aborted{};
307 
308  using StateVariant = std::variant<NotStarted,
309  Starting,
310  Acquiring,
311  Stopping,
312  Stopped,
313  Aborting,
314  Aborted>;
315  StateVariant MakeState(State s) const noexcept;
316 #ifdef UNIT_TEST
317 public:
318 #endif
319  // Protected constructor so user is forced to use shared pointer semantics
320  // which is required because of future continuation style programming.
321  DaqControllerImpl(boost::asio::io_context& io_context,
322  DaqProperties properties,
323  std::unique_ptr<FitsController> fits_controller,
324  std::shared_ptr<ObservableStatus> status,
325  std::shared_ptr<ObservableEventLog> event_log,
326  AsyncOperations ops);
327 #ifdef UNIT_TEST
328 private:
329 #endif
330  template<class T, class... Args>
331  void AddEvent(Args&&... args) {
332  m_event_log->EmplaceEvent<T>(std::forward<Args>(args)...);
333  }
334  /**
335  * Constructs the parameters used for asynchronous operations.
336  *
337  * @note AsyncOpParams will bind references to member variables so caller must
338  * guarantee that DaqController outlives the async operation.
339  * This is normally done by holding a shared copy in the last .then continuation
340  * for the async operation, thus guaranteeing that all intermediate continuations
341  * will access a valid object.
342  */
343  op::AsyncOpParams MakeParams();
344  op::AwaitOpParams MakeAwaitParams();
345 
346  /**
347  * Await completion of primary sources.
348  *
349  * If there are no sources this method does nothing.
350  */
351  void InitiateAwaitPrimarySources();
352 
353  void SetErrorFlag(bool error) noexcept;
354  void SetState(StateVariant&& s) noexcept;
355 
356  std::optional<std::variant<gsl::not_null<Source<PrimSource>*>,
357  gsl::not_null<Source<MetaSource>*>>>
358  FindSource(std::string_view source_id);
359 
360  /// Helper to build source vector
361  template <class SourceType>
362  std::vector<Source<SourceType>> MakeSources(std::vector<SourceType> sources);
363 
365  boost::asio::io_context& m_io_ctx;
368 
369  std::unique_ptr<FitsController> m_fits_ctl;
370  std::shared_ptr<ObservableStatus> m_status;
371  std::shared_ptr<ObservableEventLog> m_event_log;
372 
373  std::vector<Source<PrimSource>> m_prim_sources; ///< Note: Consider vector immutable!
374  std::vector<Source<MetaSource>> m_meta_sources; ///< Note: Consider vector immutable!
375 
377  std::shared_ptr<PendingReplies> m_pending_replies;
378  std::vector<std::unique_ptr<boost::asio::steady_timer>> m_timers;
379 
380  /**
381  * If DaqController is awaiting the completion of primary data sources
382  * this function will hold the abort function.
383  *
384  * @important Users must check if it is valid before invoking it.
385  */
386  std::function<bool()> m_abort_await_primary_sources;
387  log4cplus::Logger m_logger;
388 };
389 
390 std::ostream& operator<<(std::ostream& os, DaqController const& daq);
391 
392 } // namespace daq
393 
394 #endif // OCF_DAQ_DAQ_CONTROLLER_HPP_
pendingReplies.hpp
Contains declaration for classes related to pending replies.
DAQ_NOEXCEPT
#define DAQ_NOEXCEPT
Definition: config.hpp:16
daq::AsyncOperations::operator=
AsyncOperations & operator=(AsyncOperations const &)=default
daq::State
State
Observable states of the data acquisition process.
Definition: state.hpp:41
initiate.hpp
Contains declarations for the helper functions to initiate operations.
source.hpp
Declarations for daq::Source and related classes.
ioExecutor.hpp
daq::DaqControllerImpl::m_logger
log4cplus::Logger m_logger
Definition: daqController.hpp:387
daq::DaqControllerImpl::m_fits_ctl
std::unique_ptr< FitsController > m_fits_ctl
Definition: daqController.hpp:369
daq::DaqController::DaqController
DaqController()=default
daq::AsyncOperations::AsyncOperations
AsyncOperations(AsyncOperations &&)=default
daq::DaqControllerImpl::m_event_log
std::shared_ptr< ObservableEventLog > m_event_log
Definition: daqController.hpp:371
daq::DaqProperties
Structure carrying properties needed to start a DataAcquisition.
Definition: daqProperties.hpp:28
rad::IoExecutor
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
daq::DaqControllerImpl
Implements daq::DaqController
Definition: daqController.hpp:259
daq::DaqController::AwaitAsync
virtual boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout)=0
Awaits that data acquisition stops or aborts.
daq::DaqControllerImpl::m_prim_sources
std::vector< Source< PrimSource > > m_prim_sources
Note: Consider vector immutable!
Definition: daqController.hpp:373
daq::ObservableStatus
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:68
asyncOpParams.hpp
daq::fits::UpdateKeywords
void UpdateKeywords(KeywordVector &to, KeywordVector const &from)
Updates a with keywords from b.
Definition: keyword.cpp:120
daq::DaqControllerImpl::m_properties
DaqProperties m_properties
Definition: daqController.hpp:367
daq::op::AwaitOpParams
Await specific parameters that is not provided with AsyncOpParams.
Definition: asyncOpParams.hpp:64
daq::DaqControllerImpl::Aborting
Definition: daqController.hpp:305
daq::DaqController::StartAsync
virtual boost::future< State > StartAsync()=0
Starts the data acquisition.
daq::DaqController::~DaqController
virtual ~DaqController()=default
daq::DaqController::AbortAsync
virtual boost::future< Status > AbortAsync(ErrorPolicy policy)=0
Aborts the data acquisition.
daq
Definition: daqController.cpp:18
daq::DaqController::UpdateKeywords
virtual void UpdateKeywords(fits::KeywordVector const &keywords)=0
Updates (replace or add) list of keywords.
eventLog.hpp
Contains declaration for EventLog, ObservableEventLog and related events.
ocmif::MakeState
ocmif::DaqSubState MakeState(daq::State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:32
daq::AsyncOperations::IsValid
bool IsValid() const noexcept
Definition: daqController.cpp:45
config.hpp
Config class header file.
daq::AsyncOperations::operator=
AsyncOperations & operator=(AsyncOperations &&)=default
daq::DaqControllerImpl::m_meta_sources
std::vector< Source< MetaSource > > m_meta_sources
Note: Consider vector immutable!
Definition: daqController.hpp:374
daq::AsyncOperations::stop
std::function< boost::future< Result< DpParts > >ErrorPolicy, op::AsyncOpParams)> stop
Definition: daqController.hpp:64
daq::Result
Utility class that represents a result and an error.
Definition: utility.hpp:17
daq::DaqControllerImpl::Aborted
Definition: daqController.hpp:306
daq::DaqControllerImpl::m_executor
rad::IoExecutor m_executor
Definition: daqController.hpp:366
daq::DaqControllerImpl::m_timers
std::vector< std::unique_ptr< boost::asio::steady_timer > > m_timers
Definition: daqController.hpp:378
daq::DaqControllerImpl::StateVariant
std::variant< NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted > StateVariant
Definition: daqController.hpp:314
daq::DaqControllerImpl::Stopping
Definition: daqController.hpp:303
status.hpp
Contains declaration for Status and ObservableStatus.
daq::DaqControllerImpl::m_state
StateVariant m_state
Definition: daqController.hpp:364
daq::AsyncOperations::AwaitReturnType
std::pair< boost::future< Result< DpParts > >, std::function< bool()> > AwaitReturnType
Definition: daqController.hpp:49
daq::DaqControllerImpl::m_async_ops
AsyncOperations m_async_ops
Definition: daqController.hpp:376
daq::AsyncOperations::await_prim
std::function< AwaitReturnType(op::AwaitOpParams)> await_prim
Definition: daqController.hpp:65
server::GetLogger
log4cplus::Logger & GetLogger()
Definition: logger.cpp:14
daq::DaqControllerImpl::m_io_ctx
boost::asio::io_context & m_io_ctx
Definition: daqController.hpp:365
daq::operator<<
std::ostream & operator<<(std::ostream &os, DaqController const &daq)
Definition: daqController.cpp:49
daq::fits::KeywordVector
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:138
daq::DaqController::GetState
virtual State GetState() const DAQ_NOEXCEPT=0
daq::op::AsyncOpParams
Parameters required for each async operation.
Definition: asyncOpParams.hpp:23
daq::ObservableEventLog
Stores data acquisition status and allows subscription to status changes.
Definition: eventLog.hpp:107
daq::DaqControllerImpl::m_status
std::shared_ptr< ObservableStatus > m_status
Definition: daqController.hpp:370
state.hpp
Declares daq::State and related functions.
daq::DaqControllerImpl::Acquiring
Definition: daqController.hpp:302
daq::AsyncOperations::start
std::function< boost::future< void >op::AsyncOpParams)> start
Definition: daqController.hpp:62
dpPart.hpp
Contains declaration for DpPart.
daq::ErrorPolicy
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
fitsController.hpp
Mock of FitsController.
daq::DaqControllerImpl::NotStarted
Definition: daqController.hpp:300
daqProperties.hpp
Contains declaration of daq::Properties.
daq::AsyncOperations::abort
std::function< boost::future< Result< void > >ErrorPolicy, op::AsyncOpParams)> abort
Definition: daqController.hpp:63
daq::DaqController
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
Definition: daqController.hpp:163
daq::DaqControllerImpl::Stopped
Definition: daqController.hpp:304
daq::DaqControllerImpl::AddEvent
void AddEvent(Args &&... args)
Definition: daqController.hpp:331
daq::AsyncOperations::AsyncOperations
AsyncOperations()
Default constructs object with standard async operations.
Definition: daqController.cpp:32
daq::AsyncOperations
Async operations.
Definition: daqController.hpp:46
daq::DaqControllerImpl::m_pending_replies
std::shared_ptr< PendingReplies > m_pending_replies
Definition: daqController.hpp:377
daq::DpParts
std::vector< DpPart > DpParts
Definition: dpPart.hpp:49
daq::DaqControllerImpl::m_abort_await_primary_sources
std::function< bool()> m_abort_await_primary_sources
If DaqController is awaiting the completion of primary data sources this function will hold the abort...
Definition: daqController.hpp:386
daq::DaqController::StopAsync
virtual boost::future< Status > StopAsync(ErrorPolicy policy)=0
Stops the data acquisition.
daq::DaqControllerImpl::Starting
Definition: daqController.hpp:301
daq::AsyncOperations::AsyncOperations
AsyncOperations(AsyncOperations const &)=default
error.hpp
Contains error related declarations for DAQ.