ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
awaitPrim.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdaq
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Contains declaration for the AwaitPrimAsync operation
7  */
8 #ifndef OCM_DAQ_OP_AWAIT_HPP_
9 #define OCM_DAQ_OP_AWAIT_HPP_
10 #include "../config.hpp"
11 
12 #include <vector>
13 
14 #include <boost/thread/future.hpp>
15 #include <boost/asio/steady_timer.hpp>
16 
17 #include "../dpPart.hpp"
18 #include "../error.hpp"
19 #include "asyncOpParams.hpp"
20 
21 namespace daq::op {
22 
23 /**
24  * A composite async operation that awaits primary data sources.
25  *
26  * This is used by OCM to await primary data sources completion.
27  * Since sources may acquire data for a long time the operation is implemented by
28  * sending await commands to client with a smaller timeout, and then resending request
29  * on timeout. This is done to be able to have a reasonable MAL timeout to detect network related
30  * issues.
31  *
32  * Notes
33  * -----
34  *
35  * The await operation returns files produced from awaited-on sources (using DpParts result).
36  *
37  * Await requests are ony sent to source if observed state requires it (i.e. if source is not
38  * already observed to be stopped, from a previous AwaitPrimAsync operation).
39  *
40  * Await operation is completed when any of the following conditions are fulfilled:
41  *
42  * - All sources reply that recording is complete.
43  * - Any source reply with fatal error.
44  * - Operation is aborted.
45  *
46  * The operation does not allow configurable error policy but behaves as if ErrorPolicy::Robust
47  * is set. The following are condidered fatal errors (causing result of operation to be an
48  * exception):
49  * - Internal errors.
50  *
51  * @todo Do not assume commands are honoring the timeout parameter. Instead keep track of the send
52  * interval. Otherwise a malfunctioning subsystem that immediately returns exception will be spammed
53  * with requests.
54  * @ingroup daq_common_libdaq
55  */
57 public:
58  /**
59  * Constructs operation with the privided parameters.
60  *
61  * @param params parameters.
62  */
63  explicit AwaitPrimAsync(AwaitOpParams params) noexcept;
64 
65  /**
66  * Initiates operation that await acquisition completion.
67  *
68  * @note Caller is responsible for keeping object alive until
69  * result is set.
70  */
71  [[nodiscard]] boost::future<Result<DpParts>> Initiate();
72 
73  /**
74  * Aborts the operation.
75  */
76  void Abort() noexcept;
77 
78 private:
79  /**
80  * Initiates the await operation that will eventually set the m_promise value or exception
81  * and returns.
82  */
83  void InitiateAwait();
84  /**
85  * Send request to all incomplete primary sources and wait for reply.
86  *
87  * @returns future set when all sources reply.
88  * - True means all awaited-on sources have completed.
89  * - False if one or more sources are not completed.
90  * - Contains exception on fatal error.
91  *
92  * Source state are updated using status from reply, including produced files.
93  */
94  boost::future<bool> AwaitOnceAsync();
95  /**
96  * Callback for each reply that
97  * - updates source state,
98  * - adds data products
99  *
100  * @returns true if RecWait completed successfully (data acquisition is complete)
101  * @returns false if RecWait timed out (data acquisition is still ongoing)
102  * @throws DaqSourceError if RecWait contains exception.
103  */
104  bool HandleRecWaitReply(Source<PrimSource>& source,
105  boost::future<std::shared_ptr<recif::RecWaitStatus>>&& fut);
106 
107  /**
108  * Returns future with value set when requests should be sent.
109  */
110  boost::future<void> MakeInterval();
111  AwaitOpParams m_params;
112 
113  struct IntervalTimer {
114  IntervalTimer(boost::asio::io_context& io_ctx, std::chrono::steady_clock::time_point next)
115  : timer(io_ctx, next), promise() {
116  }
117  ~IntervalTimer() {
118  timer.cancel();
119  }
120  boost::asio::steady_timer timer;
121  boost::promise<void> promise;
122  };
123  /**
124  * Indicates if it completed with error.
125  */
126  bool m_error;
127 
128  /**
129  * Holds result from data sources.
130  */
131  DpParts m_parts;
132 
133  /**
134  * Indicates whether operation was aborted or not.
135  *
136  * If this is true it means that m_promise has already been fulfilled.
137  */
138  bool m_aborted;
139 
140  /**
141  * Promise for future returned from `Initiate()`
142  */
143  boost::promise<Result<DpParts>> m_promise;
144  /**
145  * Time of last InitiateAwait
146  */
147  std::optional<IntervalTimer> m_interval;
148  std::optional<std::chrono::steady_clock::time_point> m_last_start;
149 };
150 
151 } // namespace daq::op
152 #endif // #ifndef OCM_DAQ_OP_AWAIT_HPP_
std::vector< DpPart > DpParts
Definition: dpPart.hpp:66
Simple class that holds the source and associated state.
Definition: source.hpp:29
Await specific parameters that is not provided with AsyncOpParams.
A composite async operation that awaits primary data sources.
Definition: awaitPrim.hpp:56
boost::future< Result< DpParts > > Initiate()
Initiates operation that await acquisition completion.
Definition: awaitPrim.cpp:103
AwaitPrimAsync(AwaitOpParams params) noexcept
Constructs operation with the privided parameters.
Definition: awaitPrim.cpp:99
void Abort() noexcept
Aborts the operation.
Definition: awaitPrim.cpp:169