ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
util.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdaq
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Contains declaration for the async op utilities
7  */
8 #ifndef OCF_DAQ_OP_UTIL_HPP_
9 #define OCF_DAQ_OP_UTIL_HPP_
10 #include "../config.hpp"
11 
12 #include <algorithm>
13 #include <optional>
14 #include <vector>
15 
16 #include <Metadaqif.hpp>
17 #include <boost/thread/future.hpp>
18 #include <fmt/format.h>
19 #include <fmt/ostream.h>
20 #include <log4cplus/logger.h>
21 #include <log4cplus/loggingmacros.h>
22 
23 #include "../error.hpp"
24 #include "../state.hpp"
25 #include "asyncOpParams.hpp"
26 
27 namespace daq::op {
28 namespace eventlog {
29 template <class SourceType>
30 void Reply(AsyncOpParams& params, SourceType const& source, std::string description, bool error) {
32  params.id,
33  fmt::format(!error ? "{}: Got reply from '{}'" : "{}: Got error or timeout from '{}'",
34  description,
35  source),
36  params.status.GetStatus()));
37 }
38 template <class SourceType>
39 void Request(AsyncOpParams& params, SourceType const& source, std::string description) {
40  params.event_log.AddEvent(
41  GenericEvent(params.id,
42  fmt::format("{}: Sending request to'{}'", description, source),
43  params.status.GetStatus()));
44 }
45 
46 template <class F>
47 size_t CountExceptions(std::vector<F> const& futures) noexcept {
48  return std::count_if(futures.begin(), futures.end(), [](F const& future) noexcept -> bool {
49  return future.has_exception();
50  });
51 }
52 } // namespace eventlog
53 
54 /**
55  * Utility function to Send requests and collect replies.
56  *
57  * @todo: Need to generalize based on requestor reply type.
58  *
59  * @param begin Beginning of sequence of sources to send request.
60  * @param end End of sequence of sources to send request.
61  * @param params Parameters for this async operation.
62  * @param sender Functor that sends request and returns future.
63  * @param reply_handler Handler invoked once for each reply received. The handler may transform the
64  * reply and return a value containing a different type than the input. It should not return a
65  * future.
66  * @param replies_handler Handler invoked once when all replies are received. It will be passed the
67  * vector of replies returned from reply_handler. To communicate failure it shall throw an
68  * exception.
69  *
70  * @return Future of R, as returned by replies handler
71  *
72  * Type requirement:
73  *
74  * with `T` being the requestor reply type.
75  *
76  * `Sender`: boost::future<T> (Source<S>&)
77  * `ReplyHandler`: R (AsyncOpParams, Source<S>, boost::future<T>)
78  *
79  * @ingroup daq_common_libdaq
80  */
81 template <class R, class Iterator, class Pred, class Sender, class ReplyHandler>
82 boost::future<std::vector<boost::future<R>>>
83 SendRequestAndCollectReplies(Iterator begin,
84  Iterator end,
85  Pred filter_pred,
86  AsyncOpParams params,
87  Sender sender,
88  ReplyHandler reply_handler,
89  std::string_view logging_description);
90 
91 template <class T>
92 std::vector<std::exception_ptr> ExtractExceptions(std::vector<boost::future<T>>& futures) {
93  std::vector<std::exception_ptr> exceptions;
94  for (auto& f : futures) {
95  try {
96  f.get();
97  } catch (std::exception const& e) {
98  LOG4CPLUS_DEBUG("TEST", "E:" << e.what());
99  exceptions.push_back(std::current_exception());
100  }
101  }
102  return exceptions;
103 }
104 
105 /**
106  * Unwrap futures to extract errors.
107  *
108  * @throws DaqSourceErrors containing all exceptions.
109  *
110  * @ingroup daq_common_libdaq
111  */
112 void UnwrapVoidReplies(boost::future<std::vector<boost::future<void>>> futures);
113 
114 /**
115  * Unwrap replies
116  *
117  * @ingroup daq_common_libdaq
118  */
119 template <class R>
120 std::vector<R> UnwrapReplies(boost::future<std::vector<boost::future<R>>>&& futures) {
121  // @todo: Implement
122  return {};
123 }
124 
125 /**
126  * Reply handler that checks for exceptions in reply
127  *
128  * @throws Exception contained in reply.
129  *
130  * @return fut
131  *
132  * @param expected_state The state the source is expected to be in when receving the reply.
133  * @param success_state The state the source is set to on success.
134  * @param daq Data acquisition the operation belongs to.
135  * @param source The source to handle reply from.
136  * @param fut Reply.
137  *
138  * @ingroup daq_common_libdaq
139  */
140 template <class ReplyType>
141 std::optional<ReplyType> HandleMetaDaqReply(char const* request,
142  std::optional<State> expected_state,
143  State success_state,
144  std::optional<State> error_state,
145  AsyncOpParams params,
146  Source<MetaSource>& source,
147  boost::future<ReplyType>&& fut);
148 
149 /**
150  * Reply handler that checks for exceptions in reply
151  *
152  * @throws Exception contained in reply.
153  *
154  * @return fut contained value or std::nullopt of expected state is already satisifed
155  *
156  * @param expected_state The state the source is expected to be in when receving the reply.
157  * @param success_state The state the source is set to on success.
158  * @param daq Data acquisition the operation belongs to.
159  * @param source The source to handle reply from.
160  * @param fut Reply.
161  *
162  * @ingroup daq_common_libdaq
163  */
164 template <class ReplyType>
165 std::optional<ReplyType> HandlePrimDaqReply(char const* request,
166  std::optional<State> expected_state,
167  State success_state,
168  std::optional<State> error_state,
169  AsyncOpParams params,
170  Source<PrimSource>& source,
171  boost::future<ReplyType>&& fut);
172 
173 // Implementation
174 template <class R, class Iterator, class Pred, class Sender, class ReplyHandler>
175 boost::future<std::vector<boost::future<R>>>
177  Iterator end,
178  Pred filter_pred,
179  AsyncOpParams params,
180  Sender sender,
181  ReplyHandler reply_handler,
182  std::string_view logging_description) {
183  LOG4CPLUS_INFO(params.logger,
184  fmt::format("{}: {}: Sending requests.", params.status, logging_description));
185  using Sourceype = typename Iterator::value_type;
186 
187  std::vector<boost::future<R>> replies;
188  bool stop = false;
189 
190  // Function that sends request and returns a processed (by reply_handler) reply
191  // This transforms the type from the raw MAL future to whatever reply_handler returns.
192  auto send_func = [&, sender = std::move(sender), descr = std::string(logging_description)](
193  Sourceype& source) -> boost::future<R> {
194  try {
195  // continuation
196  auto cont =
197  [=,
198  // The source vector should be considered immutable so it's ok to
199  // take ref.
200  & source = source,
201  reply_token = params.pending_replies.Acquire(
202  std::string(source.GetSource().GetName()), std::string(logging_description)),
203  reply_handler = std::move(reply_handler)](auto fut) mutable -> R {
204  eventlog::Reply(params, source, descr, fut.has_exception());
205  // Got reply, release reply_token.
206  reply_token.Release();
207  return std::invoke(reply_handler, params, source, std::move(fut));
208  };
209 
210  eventlog::Request(params, source, descr);
211 
212  // Send request
213  auto fut =
214  std::invoke(std::move(sender), source).then(params.executor, std::move(cont));
215  return fut;
216  } catch (std::exception const& e) {
217  // @todo: Do I need the error flag?
218  source.SetErrorFlag();
219  stop = true; // just an optimiazation to not Send futher requests
220 
221  eventlog::Request(params, source, descr);
222 
223  LOG4CPLUS_ERROR(params.logger,
224  fmt::format("{}: Starting:"
225  "error Sending startDaq to metadata source {}: {}",
226  params.status,
227  descr,
228  source,
229  e.what()));
230  return boost::make_exceptional_future<R>();
231  }
232  };
233 
234  // Send
235  for (auto it = begin; it != end; ++it) {
236  if (stop) {
237  break;
238  }
239  if (!filter_pred(*it)) {
240  continue;
241  }
242 
243  replies.emplace_back(send_func(*it));
244  }
245 
246  LOG4CPLUS_INFO(
247  params.logger,
248  fmt::format(
249  "{}: {}: Awaiting {} replies.", params.status, logging_description, replies.size()));
250 
251  // Join replies
252  // note: when_all returns boost::future<vector<boost::future<R>>>
253  return boost::when_all(replies.begin(), replies.end())
254  .then(params.executor,
255  [params, descr = std::string(logging_description)](
256  boost::future<std::vector<boost::future<R>>>&& replies_fut)
257  -> std::vector<boost::future<R>> {
258  auto replies = replies_fut.get(); // should never throw
259  LOG4CPLUS_INFO(
260  params.logger,
261  fmt::format("{}: {}: All replies received or timed out (num errors {}/{})",
262  params.status,
263  descr,
264  eventlog::CountExceptions(replies),
265  replies.size()));
266  return replies;
267  });
268 }
269 
270 } // namespace daq::op
271 #endif // #ifndef OCF_DAQ_OP_UTIL_HPP_
daq::op::UnwrapVoidReplies
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void >>> futures)
Unwrap futures to extract errors.
Definition: util.cpp:60
daq::op::eventlog::Reply
void Reply(AsyncOpParams &params, SourceType const &source, std::string description, bool error)
Definition: util.hpp:30
daq::op::UnwrapReplies
std::vector< R > UnwrapReplies(boost::future< std::vector< boost::future< R >>> &&futures)
Unwrap replies.
Definition: util.hpp:120
daq::State
State
Observable states of the data acquisition process.
Definition: state.hpp:39
daq::op::AsyncOpParams::logger
log4cplus::Logger & logger
Definition: asyncOpParams.hpp:101
daq::op::AsyncOpParams::pending_replies
PendingReplies & pending_replies
Definition: asyncOpParams.hpp:103
daq::op::AsyncOpParams::event_log
ObservableEventLog & event_log
Definition: asyncOpParams.hpp:95
daq::Source
Simple class that holds the source and associated state.
Definition: source.hpp:29
daq::op::AsyncOpParams::status
ObservableStatus & status
Async operations should not modify state directly DaqController does that.
Definition: asyncOpParams.hpp:94
asyncOpParams.hpp
daq::op::eventlog::Request
void Request(AsyncOpParams &params, SourceType const &source, std::string description)
Definition: util.hpp:39
daq::op
Definition: abort.hpp:19
daq::op::HandlePrimDaqReply
std::optional< ReplyType > HandlePrimDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< PrimSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
Definition: util.cpp:162
daq::op::AsyncOpParams::executor
rad::IoExecutor & executor
Definition: asyncOpParams.hpp:100
daq::op::SendRequestAndCollectReplies
boost::future< std::vector< boost::future< R > > > SendRequestAndCollectReplies(Iterator begin, Iterator end, Pred filter_pred, AsyncOpParams params, Sender sender, ReplyHandler reply_handler, std::string_view logging_description)
Utility function to Send requests and collect replies.
Definition: util.hpp:176
daq::op::ExtractExceptions
std::vector< std::exception_ptr > ExtractExceptions(std::vector< boost::future< T >> &futures)
Definition: util.hpp:92
daq::op::AsyncOpParams::id
std::string const & id
Definition: asyncOpParams.hpp:102
daq::ObservableStatus::GetStatus
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
Definition: status.cpp:243
daq::op::eventlog::CountExceptions
size_t CountExceptions(std::vector< F > const &futures) noexcept
Definition: util.hpp:47
daq::PendingReplies::Acquire
ReplyToken Acquire(std::string source_id, std::string request)
Acquire token.
Definition: pendingReplies.cpp:38
daq::GenericEvent
Represents a generic event if a more specific event is not usable.
Definition: eventLog.hpp:29
daq::op::AsyncOpParams
Parameters required for each async operation.
Definition: asyncOpParams.hpp:67
daq::op::HandleMetaDaqReply
std::optional< ReplyType > HandleMetaDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< MetaSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
Definition: util.cpp:72
error
Definition: main.cpp:23
daq::ObservableEventLog::AddEvent
void AddEvent(EventLog::EventType event)
Records that a file has been produced for this data acquisition.
Definition: eventLog.cpp:54