ifw-daq  1.0.0
IFW Data Acquisition modules
util.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq
4  * @copyright 2021 ESO - European Southern Observatory
5  *
6  * @brief Contains declaration for the async op utilities
7  */
8 #ifndef OCF_DAQ_OP_UTIL_HPP_
9 #define OCF_DAQ_OP_UTIL_HPP_
10 #include "../config.hpp"
11 
12 #include <algorithm>
13 #include <optional>
14 #include <vector>
15 
16 #include <Metadaqif.hpp>
17 #include <boost/thread/future.hpp>
18 #include <fmt/format.h>
19 #include <fmt/ostream.h>
20 #include <log4cplus/logger.h>
21 #include <log4cplus/loggingmacros.h>
22 
23 #include "../error.hpp"
24 #include "../state.hpp"
25 #include "asyncOpParams.hpp"
26 
27 namespace daq::op {
28 namespace eventlog {
29 template <class SourceType>
30 void Reply(AsyncOpParams& params, SourceType const& source, std::string description, bool error) {
32  params.id,
33  fmt::format(!error ? "{}: Got reply from '{}'" : "{}: Got error or timeout from '{}'",
34  description,
35  source),
36  params.status.GetStatus()));
37 }
38 template <class SourceType>
39 void Request(AsyncOpParams& params, SourceType const& source, std::string description) {
40  params.event_log.AddEvent(
41  GenericEvent(params.id,
42  fmt::format("{}: Sending request to'{}'", description, source),
43  params.status.GetStatus()));
44 }
45 
46 template <class F>
47 size_t CountExceptions(std::vector<F> const& futures) noexcept {
48  return std::count_if(futures.begin(), futures.end(), [](F const& future) noexcept -> bool {
49  return future.has_exception();
50  });
51 }
52 } // namespace eventlog
53 
54 /**
55  * Utility function to Send requests and collect replies.
56  *
57  * @todo: Need to generalize based on requestor reply type.
58  *
59  * @param begin Beginning of sequence of sources to send request.
60  * @param end End of sequence of sources to send request.
61  * @param params Parameters for this async operation.
62  * @param sender Functor that sends request and returns future.
63  * @param reply_handler Handler invoked once for each reply received. The handler may transform the
64  * reply and return a value containing a different type than the input. It should not return a
65  * future.
66  * @param replies_handler Handler invoked once when all replies are received. It will be passed the
67  * vector of replies returned from reply_handler. To communicate failure it shall throw an
68  * exception.
69  *
70  * @return Future of R, as returned by replies handler
71  *
72  * Type requirement:
73  *
74  * with `T` being the requestor reply type.
75  *
76  * `Sender`: boost::future<T> (Source<S>&)
77  * `ReplyHandler`: R (AsyncOpParams, Source<S>, boost::future<T>)
78  *
79  * @ingroup daq_ocm_libdaq
80  */
81 template <class R, class Iterator, class Pred, class Sender, class ReplyHandler>
82 boost::future<std::vector<boost::future<R>>>
83 SendRequestAndCollectReplies(Iterator begin,
84  Iterator end,
85  Pred filter_pred,
86  AsyncOpParams params,
87  Sender sender,
88  ReplyHandler reply_handler,
89  std::string_view logging_description);
90 
91 template <class T>
92 std::vector<std::exception_ptr> ExtractExceptions(std::vector<boost::future<T>>& futures) {
93  std::vector<std::exception_ptr> exceptions;
94  for (auto& f : futures) {
95  try {
96  f.get();
97  } catch (...) {
98  exceptions.push_back(std::current_exception());
99  }
100  }
101  return exceptions;
102 }
103 
104 /**
105  * Unwrap futures to extract errors.
106  *
107  * @throws DaqSourceErrors containing all exceptions.
108  *
109  * @ingroup daq_ocm_libdaq
110  */
111 void UnwrapVoidReplies(boost::future<std::vector<boost::future<void>>>&& futures);
112 
113 /**
114  * Unwrap replies
115  *
116  * @ingroup daq_ocm_libdaq
117  */
118 template <class R>
119 std::vector<R> UnwrapReplies(boost::future<std::vector<boost::future<R>>>&& futures) {
120  // @todo: Implement
121  return {};
122 }
123 
124 /**
125  * Reply handler that checks for exceptions in reply
126  *
127  * @throws Exception contained in reply.
128  *
129  * @return fut
130  *
131  * @param expected_state The state the source is expected to be in when receving the reply.
132  * @param success_state The state the source is set to on success.
133  * @param daq Data acquisition the operation belongs to.
134  * @param source The source to handle reply from.
135  * @param fut Reply.
136  *
137  * @ingroup daq_ocm_libdaq
138  */
139 template <class ReplyType>
140 std::optional<ReplyType> HandleMetaDaqReply(std::optional<State> expected_state,
141  State success_state,
142  std::optional<State> error_state,
143  AsyncOpParams params,
144  Source<MetaSource>& source,
145  boost::future<ReplyType>&& fut);
146 
147 /**
148  * Reply handler that checks for exceptions in reply
149  *
150  * @throws Exception contained in reply.
151  *
152  * @return fut contained value or std::nullopt of expected state is already satisifed
153  *
154  * @param expected_state The state the source is expected to be in when receving the reply.
155  * @param success_state The state the source is set to on success.
156  * @param daq Data acquisition the operation belongs to.
157  * @param source The source to handle reply from.
158  * @param fut Reply.
159  *
160  * @ingroup daq_ocm_libdaq
161  */
162 template<class ReplyType>
163 std::optional<ReplyType> HandlePrimDaqReply(std::optional<State> expected_state,
164  State success_state,
165  std::optional<State> error_state,
166  AsyncOpParams params,
167  Source<PrimSource>& source,
168  boost::future<ReplyType>&& fut);
169 
170 // Implementation
171 template <class R, class Iterator, class Pred, class Sender, class ReplyHandler>
172 boost::future<std::vector<boost::future<R>>>
174  Iterator end,
175  Pred filter_pred,
176  AsyncOpParams params,
177  Sender sender,
178  ReplyHandler reply_handler,
179  std::string_view logging_description) {
180  LOG4CPLUS_INFO(params.logger,
181  fmt::format("{}: {}: Sending requests.", params.status, logging_description));
182  using Sourceype = typename Iterator::value_type;
183 
184  std::vector<boost::future<R>> replies;
185  bool stop = false;
186 
187  // Function that sends request and returns a processed (by reply_handler) reply
188  // This transforms the type from the raw MAL future to whatever reply_handler returns.
189  auto send_func = [&, sender = std::move(sender), descr = std::string(logging_description)](
190  Sourceype& source) -> boost::future<R> {
191  try {
192  // continuation
193  auto cont = [=,
194  // The source vector should be considered immutable so it's ok to
195  // take ref.
196  & source = source,
197  reply_token = params.pending_replies.Acquire(
198  std::string(source.GetSource().GetName()),
199  std::string(logging_description)),
200  reply_handler = std::move(reply_handler)](auto fut) mutable -> R {
201  eventlog::Reply(params, source, descr, fut.has_exception());
202  // Got reply, release reply_token.
203  reply_token.Release();
204  return std::invoke(reply_handler, params, source, std::move(fut));
205  };
206 
207  eventlog::Request(params, source, descr);
208 
209  // Send request
210  auto fut =
211  std::invoke(std::move(sender), source).then(params.executor, std::move(cont));
212  return fut;
213  } catch (std::exception const& e) {
214  // @todo: Do I need the error flag?
215  source.SetErrorFlag();
216  stop = true; // just an optimiazation to not Send futher requests
217 
218  eventlog::Request(params, source, descr);
219 
220  LOG4CPLUS_ERROR(params.logger,
221  fmt::format("{}: Starting:"
222  "error Sending startDaq to metadata source {}: {}",
223  params.status,
224  descr,
225  source,
226  e.what()));
227  return boost::make_exceptional_future<R>();
228  }
229  };
230 
231  // Send
232  for (auto it = begin; it != end; ++it) {
233  if (stop) {
234  break;
235  }
236  if (!filter_pred(*it)) {
237  continue;
238  }
239 
240  replies.emplace_back(send_func(*it));
241  }
242 
243  LOG4CPLUS_INFO(
244  params.logger,
245  fmt::format(
246  "{}: {}: Awaiting {} replies.", params.status, logging_description, replies.size()));
247 
248  // Join replies
249  // note: when_all returns boost::future<vector<boost::future<R>>>
250  return boost::when_all(replies.begin(), replies.end())
251  .then(
252  params.executor,
253  [params, descr = std::string(logging_description)](
254  boost::future<std::vector<boost::future<R>>>&& replies_fut)
255  -> std::vector<boost::future<R>> {
256  auto replies = replies_fut.get(); // should never throw
257  LOG4CPLUS_INFO(
258  params.logger,
259  fmt::format("{}: {}: All replies received or timed out (num errors {}/{})",
260  params.status, descr, eventlog::CountExceptions(replies),
261  replies.size()));
262  return replies;
263  });
264 }
265 
266 } // namespace daq::op
267 #endif // #ifndef OCF_DAQ_OP_UTIL_HPP_
daq::op::eventlog::Reply
void Reply(AsyncOpParams &params, SourceType const &source, std::string description, bool error)
Definition: util.hpp:30
daq::State
State
Observable states of the data acquisition process.
Definition: state.hpp:41
daq::op::AsyncOpParams::logger
log4cplus::Logger & logger
Definition: asyncOpParams.hpp:51
daq::op::AsyncOpParams::pending_replies
PendingReplies & pending_replies
Definition: asyncOpParams.hpp:53
daq::op::SendRequestAndCollectReplies
boost::future< std::vector< boost::future< R > > > SendRequestAndCollectReplies(Iterator begin, Iterator end, Pred filter_pred, AsyncOpParams params, Sender sender, ReplyHandler reply_handler, std::string_view logging_description)
Utility function to Send requests and collect replies.
Definition: util.hpp:173
daq::ObservableEventLog::AddEvent
void AddEvent(EventLog::EventType event)
Records that a file has been produced for this data acquisition.
Definition: eventLog.cpp:54
daq::op::AsyncOpParams::event_log
ObservableEventLog & event_log
Definition: asyncOpParams.hpp:49
daq::op::UnwrapVoidReplies
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void >>> &&futures)
Unwrap futures to extract errors.
Definition: util.cpp:54
daq::Source
Simple class that holds the source and associated state.
Definition: source.hpp:29
asyncOpParams.hpp
daq::op::eventlog::Request
void Request(AsyncOpParams &params, SourceType const &source, std::string description)
Definition: util.hpp:39
daq::op
Definition: abort.hpp:19
daq::op::HandlePrimDaqReply
std::optional< ReplyType > HandlePrimDaqReply(std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< PrimSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
Definition: util.cpp:136
daq::op::AsyncOpParams::executor
rad::IoExecutor & executor
Definition: asyncOpParams.hpp:50
daq::ObservableStatus::GetStatus
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
Definition: status.cpp:125
daq::op::UnwrapReplies
std::vector< R > UnwrapReplies(boost::future< std::vector< boost::future< R >>> &&futures)
Unwrap replies.
Definition: util.hpp:119
daq::op::ExtractExceptions
std::vector< std::exception_ptr > ExtractExceptions(std::vector< boost::future< T >> &futures)
Definition: util.hpp:92
daq::op::AsyncOpParams::id
std::string const & id
Definition: asyncOpParams.hpp:52
daq::op::AsyncOpParams::status
ObservableStatus const & status
Async operations should not modify status directly DaqController does that.
Definition: asyncOpParams.hpp:48
daq::op::HandleMetaDaqReply
std::optional< ReplyType > HandleMetaDaqReply(std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< MetaSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
Definition: util.cpp:65
daq::GenericEvent
Represents a generic event if a more specific event is not usable.
Definition: eventLog.hpp:29
daq::op::eventlog::CountExceptions
size_t CountExceptions(std::vector< F > const &futures) noexcept
Definition: util.hpp:47
daq::PendingReplies::Acquire
ReplyToken Acquire(std::string source_id, std::string request)
Acquire token.
Definition: pendingReplies.cpp:38
daq::op::AsyncOpParams
Parameters required for each async operation.
Definition: asyncOpParams.hpp:23