ifw-daq  1.0.0
IFW Data Acquisition modules
util.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_op
4  * @copyright 2021 ESO - European Southern Observatory
5  *
6  * @brief Contains definition for the async op utilities
7  */
8 #include <daq/op/util.hpp>
9 
10 #include <cassert>
11 
12 #include <gsl/gsl_util>
13 
14 namespace daq::op {
15 
16 using metadaqif::DaqReply;
17 using metadaqif::DaqStopReply;
18 using recif::RecStatus;
19 
20 template std::optional<std::shared_ptr<DaqReply>>
21 HandleMetaDaqReply<std::shared_ptr<DaqReply>>(std::optional<State> expected_state,
22  State success_state,
23  std::optional<State> error_state,
24  AsyncOpParams params,
25  Source<MetaSource>& source,
26  boost::future<std::shared_ptr<DaqReply>>&& fut);
27 
28 template std::optional<std::shared_ptr<DaqStopReply>>
29 HandleMetaDaqReply<std::shared_ptr<DaqStopReply>>(
30  std::optional<State> expected_state,
31  State success_state,
32  std::optional<State> error_state,
33  AsyncOpParams params,
34  Source<MetaSource>& source,
35  boost::future<std::shared_ptr<DaqStopReply>>&& fut);
36 
37 // RecAbort return std::string
38 template std::optional<std::string>
39 HandlePrimDaqReply<std::string>(std::optional<State> expected_state,
40  State success_state,
41  std::optional<State> error_state,
42  AsyncOpParams params,
43  Source<PrimSource>& source,
44  boost::future<std::string>&& fut);
45 
46 template std::optional<std::shared_ptr<RecStatus>>
47 HandlePrimDaqReply<std::shared_ptr<RecStatus>>(std::optional<State> expected_state,
48  State success_state,
49  std::optional<State> error_state,
50  AsyncOpParams params,
51  Source<PrimSource>& source,
52  boost::future<std::shared_ptr<RecStatus>>&& fut);
53 
54 void UnwrapVoidReplies(boost::future<std::vector<boost::future<void>>>&& futures) {
55  LOG4CPLUS_DEBUG("daq", "UnwrapVoidReplies: Entered.");
56  std::vector<boost::future<void>> values = futures.get();
57  auto exceptions = ExtractExceptions<void>(values);
58  if (!exceptions.empty()) {
59  throw boost::enable_current_exception(DaqSourceErrors(std::move(exceptions)));
60  }
61  LOG4CPLUS_DEBUG("daq", "UnwrapVoidReplies: Done.");
62 }
63 
64 template <class ReplyType>
65 std::optional<ReplyType> HandleMetaDaqReply(std::optional<State> expected_state,
66  State success_state,
67  std::optional<State> error_state,
68  AsyncOpParams params,
69  Source<MetaSource>& source,
70  boost::future<ReplyType>&& fut) {
71  if (source.GetState() == success_state) {
72  // Condition already satisfied.
73  LOG4CPLUS_INFO(params.logger,
74  fmt::format("{}: State of source '{}' is already satisfied. "
75  "Will ignore reply.",
76  params.status,
77  source));
78  return std::nullopt;
79  }
80  if (expected_state) {
81  if (source.GetState() != *expected_state) {
82  LOG4CPLUS_WARN(params.logger,
83  fmt::format("{}: State of source '{}' is in an unexpected state. "
84  "Expected '{}'. Proceeding normally anyway.",
85  params.status,
86  source,
87  *expected_state));
88  }
89  }
90 
91  auto _ = gsl::finally([&] {
92  if (!std::uncaught_exceptions()) {
93  return;
94  }
95 
96  if (error_state) {
97  source.SetState(*error_state, true);
98  } else {
99  source.SetErrorFlag();
100  }
101  });
102 
103  try {
104  auto r = fut.get();
105  // Success
106  source.SetState(success_state, false);
107  return r;
108  } catch (metadaqif::DaqException const& e) {
109  LOG4CPLUS_WARN(params.logger,
110  fmt::format("{}: Metadata source '{}' replied with "
111  "id='{}'"
112  "message='{}'",
113  params.status,
114  source,
115  e.getId(),
116  e.getMessage()));
117  if (e.getId() != params.id) {
118  // Received reply for wrong data acquisition id!
119  }
120  // source reported error
121  throw boost::enable_current_exception(
122  DaqSourceError("startDaq", std::string(source.GetSource().GetName()), e.what()));
123  } catch (...) {
124  LOG4CPLUS_WARN(params.logger,
125  fmt::format("{}: Metadata source '{}' reply contains unknown exception "
126  "id='{}'",
127  params.status,
128  source));
129  // source reported error
130  throw boost::enable_current_exception(
131  DaqSourceError("startDaq", std::string(source.GetSource().GetName()), "unknown error"));
132  }
133 }
134 
135 template <class ReplyType>
136 std::optional<ReplyType> HandlePrimDaqReply(std::optional<State> expected_state,
137  State success_state,
138  std::optional<State> error_state,
139  AsyncOpParams params,
140  Source<PrimSource>& source,
141  boost::future<ReplyType>&& fut) {
142  if (source.GetState() == success_state) {
143  // Condition already satisfied.
144  // Since multiple concurrent requests are possible, it may be that another request completed
145  // the operation.
146  LOG4CPLUS_INFO(params.logger,
147  fmt::format("{}: State of source '{}' is already satisfied. "
148  "Will ignore reply.",
149  params.status,
150  source));
151  return std::nullopt;
152  }
153 
154  if (expected_state) {
155  if (source.GetState() != *expected_state) {
156  // @todo It is not likely that we want to proceed normally here. If
157  // the expected state changed from e.g. Stopping to Aborting, it doesn't make sense to
158  // continue. At the same time, if reply is not handled it might mean data is lost.
159  LOG4CPLUS_WARN(params.logger,
160  fmt::format("{}: State of source '{}' is in an unexpected state. "
161  "Expected '{}'. Proceeding normally anyway.",
162  params.status,
163  source,
164  *expected_state));
165  }
166  }
167 
168  auto _ = gsl::finally([&] {
169  if (!std::uncaught_exceptions()) {
170  return;
171  }
172 
173  if (error_state) {
174  source.SetState(*error_state, true);
175  } else {
176  source.SetErrorFlag();
177  }
178  });
179 
180  try {
181  auto r = fut.get();
182  // Success
183  source.SetState(success_state, false);
184  return r;
185  } catch (metadaqif::DaqException const& e) {
186  LOG4CPLUS_WARN(params.logger,
187  fmt::format("{}: Primdata source '{}' replied with "
188  "id='{}'"
189  "message='{}'",
190  params.status,
191  source,
192  e.getId(),
193  e.getMessage()));
194  if (e.getId() != params.id) {
195  // Received reply for wrong data acquisition id!
196  }
197  // source reported error
198  throw boost::enable_current_exception(
199  DaqSourceError("startDaq", std::string(source.GetSource().GetName()), e.what()));
200  } catch (...) {
201  LOG4CPLUS_WARN(params.logger,
202  fmt::format("{}: Primdata source '{}' reply contains unknown exception "
203  "id='{}'",
204  params.status,
205  source));
206  // source reported error
207  throw boost::enable_current_exception(
208  DaqSourceError("startDaq", std::string(source.GetSource().GetName()), "unknown error"));
209  }
210 }
211 
212 } // namespace daq::op
daq::State
State
Observable states of the data acquisition process.
Definition: state.hpp:41
daq::op::AsyncOpParams::logger
log4cplus::Logger & logger
Definition: asyncOpParams.hpp:51
daq::Source::GetSource
T & GetSource()
Definition: source.hpp:76
daq::DaqSourceErrors
Exception thrown to carry reply errors.
Definition: error.hpp:84
daq::op::UnwrapVoidReplies
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void >>> &&futures)
Unwrap futures to extract errors.
Definition: util.cpp:54
daq::Source
Simple class that holds the source and associated state.
Definition: source.hpp:29
daq::op
Definition: abort.hpp:19
daq::op::HandlePrimDaqReply
std::optional< ReplyType > HandlePrimDaqReply(std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< PrimSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
Definition: util.cpp:136
daq::op::AsyncOpParams::id
std::string const & id
Definition: asyncOpParams.hpp:52
daq::op::AsyncOpParams::status
ObservableStatus const & status
Async operations should not modify status directly DaqController does that.
Definition: asyncOpParams.hpp:48
daq::Source::GetState
State GetState() const
Definition: source.hpp:72
daq::op::HandleMetaDaqReply
std::optional< ReplyType > HandleMetaDaqReply(std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< MetaSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
Definition: util.cpp:65
daq::Source::SetState
void SetState(State state, std::optional< bool > error_flag={})
Definition: source.hpp:51
daq::op::AsyncOpParams
Parameters required for each async operation.
Definition: asyncOpParams.hpp:23
daq::DaqSourceError
Represents error in single source.
Definition: error.hpp:67
daq::Source::SetErrorFlag
void SetErrorFlag()
Definition: source.hpp:64
util.hpp
Contains declaration for the async op utilities.