ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
util.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_op
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Contains definition for the async op utilities
7  */
8 #include <daq/op/util.hpp>
9 
10 #include <cassert>
11 
12 #include <gsl/gsl_util>
13 
14 #include <daq/error/report.hpp>
15 
16 namespace daq::op {
17 
18 using metadaqif::DaqReply;
19 using metadaqif::DaqStopReply;
20 using recif::RecStatus;
21 
22 template std::optional<std::shared_ptr<DaqReply>>
23 HandleMetaDaqReply<std::shared_ptr<DaqReply>>(char const* request,
24  std::optional<State> expected_state,
25  State success_state,
26  std::optional<State> error_state,
27  AsyncOpParams params,
28  Source<MetaSource>& source,
29  boost::future<std::shared_ptr<DaqReply>>&& fut);
30 
31 template std::optional<std::shared_ptr<DaqStopReply>>
32 HandleMetaDaqReply<std::shared_ptr<DaqStopReply>>(
33  char const* request,
34  std::optional<State> expected_state,
35  State success_state,
36  std::optional<State> error_state,
37  AsyncOpParams params,
38  Source<MetaSource>& source,
39  boost::future<std::shared_ptr<DaqStopReply>>&& fut);
40 
41 // RecAbort return std::string
42 template std::optional<std::string>
43 HandlePrimDaqReply<std::string>(char const* request,
44  std::optional<State> expected_state,
45  State success_state,
46  std::optional<State> error_state,
47  AsyncOpParams params,
48  Source<PrimSource>& source,
49  boost::future<std::string>&& fut);
50 
51 template std::optional<std::shared_ptr<RecStatus>>
52 HandlePrimDaqReply<std::shared_ptr<RecStatus>>(char const* request,
53  std::optional<State> expected_state,
54  State success_state,
55  std::optional<State> error_state,
56  AsyncOpParams params,
57  Source<PrimSource>& source,
58  boost::future<std::shared_ptr<RecStatus>>&& fut);
59 
60 void UnwrapVoidReplies(boost::future<std::vector<boost::future<void>>> futures) {
61  LOG4CPLUS_DEBUG("daq", "UnwrapVoidReplies: Entered.");
62  std::vector<boost::future<void>> values = futures.get();
63  auto exceptions = ExtractExceptions<void>(values);
64  if (!exceptions.empty()) {
65  LOG4CPLUS_DEBUG("daq", "UnwrapVoidReplies: Throwing gathered exceptions.");
66  throw boost::enable_current_exception(DaqSourceErrors(std::move(exceptions)));
67  }
68  LOG4CPLUS_DEBUG("daq", "UnwrapVoidReplies: Done.");
69 }
70 
71 template <class ReplyType>
72 std::optional<ReplyType> HandleMetaDaqReply(char const* request,
73  std::optional<State> expected_state,
74  State success_state,
75  std::optional<State> error_state,
76  AsyncOpParams params,
77  Source<MetaSource>& source,
78  boost::future<ReplyType>&& fut) {
79  if (source.GetState() == success_state) {
80  // Condition already satisfied.
81  LOG4CPLUS_INFO(params.logger,
82  fmt::format("{}: State of source '{}' is already satisfied. "
83  "Will ignore reply.",
84  params.status,
85  source));
86  return std::nullopt;
87  }
88  if (expected_state) {
89  if (source.GetState() != *expected_state) {
90  LOG4CPLUS_WARN(params.logger,
91  fmt::format("{}: State of source '{}' is in an unexpected state. "
92  "Expected '{}'. Proceeding normally anyway.",
93  params.status,
94  source,
95  *expected_state));
96  }
97  }
98 
99  auto _ = gsl::finally([&] {
100  if (!std::uncaught_exceptions()) {
101  return;
102  }
103 
104  if (error_state) {
105  source.SetState(*error_state, true);
106  } else {
107  source.SetErrorFlag();
108  }
109  });
110 
111  auto alert_id =
112  MakeAlertId(alert::REQUEST, std::string(request) + source.GetSource().GetName());
113 
114  try {
115  auto r = fut.get();
116  // Success
117  source.SetState(success_state, false);
118  params.alerts.Clear(alert_id);
119  return r;
120  } catch (metadaqif::DaqException const& e) {
121  LOG4CPLUS_WARN(params.logger,
122  fmt::format("{}: Metadata source '{}' replied with "
123  "id='{}'"
124  "message='{}'",
125  params.status,
126  source,
127  e.getId(),
128  e.getMessage()));
129  params.alerts.Set(
130  MakeAlert(alert_id,
131  fmt::format("Metadata source '{}' request '{}' replied "
132  "with ICD error: {}",
133  source.GetSource().GetName(),
134  request,
135  e.getMessage())));
136 
137  if (e.getId() != params.id) {
138  // Received reply for wrong data acquisition id!
139  }
140  // source reported error
141  throw boost::enable_current_exception(
142  DaqSourceError(request, std::string(source.GetSource().GetName()), e.getMessage()));
143  } catch (...) {
144  auto what = error::FormatException(std::current_exception());
145  LOG4CPLUS_WARN(
146  params.logger,
147  fmt::format(
148  "{}: Metadata source '{}' reply contains error:\n{}", params.status, source, what));
149  params.alerts.Set(
150  MakeAlert(alert_id,
151  fmt::format("Metadata source '{}' request '{}' failed: {}",
152  source.GetSource().GetName(),
153  request,
154  what)));
155  // source reported error
156  throw boost::enable_current_exception(
157  DaqSourceError(request, std::string(source.GetSource().GetName()), what));
158  }
159 }
160 
161 template <class ReplyType>
162 std::optional<ReplyType> HandlePrimDaqReply(char const* request,
163  std::optional<State> expected_state,
164  State success_state,
165  std::optional<State> error_state,
166  AsyncOpParams params,
167  Source<PrimSource>& source,
168  boost::future<ReplyType>&& fut) {
169  if (source.GetState() == success_state) {
170  // Condition already satisfied.
171  // Since multiple concurrent requests are possible, it may be that another request completed
172  // the operation.
173  LOG4CPLUS_INFO(params.logger,
174  fmt::format("{}: State of source '{}' is already satisfied. "
175  "Will ignore reply.",
176  params.status,
177  source));
178  return std::nullopt;
179  }
180 
181  if (expected_state) {
182  if (source.GetState() != *expected_state) {
183  // @todo It is not likely that we want to proceed normally here. If
184  // the expected state changed from e.g. Stopping to Aborting, it doesn't make sense to
185  // continue. At the same time, if reply is not handled it might mean data is lost.
186  LOG4CPLUS_WARN(params.logger,
187  fmt::format("{}: State of source '{}' is in an unexpected state. "
188  "Expected '{}'. Proceeding normally anyway.",
189  params.status,
190  source,
191  *expected_state));
192  }
193  }
194 
195  auto _ = gsl::finally([&] {
196  if (!std::uncaught_exceptions()) {
197  return;
198  }
199 
200  if (error_state) {
201  source.SetState(*error_state, true);
202  } else {
203  source.SetErrorFlag();
204  }
205  });
206 
207  auto alert_id =
208  MakeAlertId(alert::REQUEST, std::string(request) + source.GetSource().GetName());
209 
210  try {
211  auto r = fut.get();
212  // Success
213  source.SetState(success_state, false);
214  params.alerts.Clear(alert_id);
215  return r;
216  } catch (recif::ExceptionErr const& e) {
217  LOG4CPLUS_WARN(params.logger,
218  fmt::format("{}: Primdata source '{}' replied with "
219  "id='{}'"
220  "message='{}'",
221  params.status,
222  source,
223  e.getCode(),
224  e.getDesc()));
225  params.alerts.Set(
226  MakeAlert(alert_id,
227  fmt::format("Primary source '{}' request '{}' replied "
228  "with error: ({}) {}",
229  source.GetSource().GetName(),
230  request,
231  e.getCode(),
232  e.getDesc())));
233  // source reported error
234  throw boost::enable_current_exception(
235  DaqSourceError(request, std::string(source.GetSource().GetName()), e.getDesc()));
236  } catch (...) {
237  auto what = error::FormatException(std::current_exception());
238  LOG4CPLUS_WARN(
239  params.logger,
240  fmt::format("{}: Primdata source '{}' failed: {}", params.status, source, what));
241  params.alerts.Set(
242  MakeAlert(alert_id,
243  fmt::format("Primary source '{}' request '{}' failed: {}",
244  source.GetSource().GetName(),
245  request,
246  what)));
247  // source reported error
248  throw boost::enable_current_exception(
249  DaqSourceError(request, std::string(source.GetSource().GetName()), what));
250  }
251 }
252 
253 } // namespace daq::op
daq::op::UnwrapVoidReplies
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void >>> futures)
Unwrap futures to extract errors.
Definition: util.cpp:60
daq::State
State
Observable states of the data acquisition process.
Definition: state.hpp:39
daq::op::AsyncOpParams::logger
log4cplus::Logger & logger
Definition: asyncOpParams.hpp:101
daq::MakeAlertId
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:39
daq::DaqSourceError
Represents error in single source.
Definition: error.hpp:67
daq::DaqSourceErrors
Exception thrown to carry reply errors.
Definition: error.hpp:84
daq::Source
Simple class that holds the source and associated state.
Definition: source.hpp:29
daq::op::AsyncOpParams::status
ObservableStatus & status
Async operations should not modify state directly DaqController does that.
Definition: asyncOpParams.hpp:94
report.hpp
daq::error::FormatException
void FormatException(std::ostream &os, std::exception_ptr ptr)
Report without nesting.
Definition: report.cpp:79
daq::op
Definition: abort.hpp:19
daq::op::HandlePrimDaqReply
std::optional< ReplyType > HandlePrimDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< PrimSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
Definition: util.cpp:162
daq::op::AsyncOpParams::alerts
AlertState & alerts
Alerts to be merged only after completion of async operation.
Definition: asyncOpParams.hpp:99
daq::op::AsyncOpParams::id
std::string const & id
Definition: asyncOpParams.hpp:102
daq::op::AlertState::Set
void Set(Alert alert)
Definition: asyncOpParams.hpp:22
daq::Source::SetErrorFlag
void SetErrorFlag()
Definition: source.hpp:64
daq::MakeAlert
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:29
daq::Source::GetSource
T & GetSource()
Definition: source.hpp:76
daq::op::AlertState::Clear
void Clear(AlertId id)
Definition: asyncOpParams.hpp:25
daq::Source::GetState
State GetState() const
Definition: source.hpp:72
daq::Source::SetState
void SetState(State state, std::optional< bool > error_flag={})
Definition: source.hpp:51
daq::op::AsyncOpParams
Parameters required for each async operation.
Definition: asyncOpParams.hpp:67
daq::op::HandleMetaDaqReply
std::optional< ReplyType > HandleMetaDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< MetaSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
Definition: util.cpp:72
daq::alert::REQUEST
constexpr std::string_view REQUEST
Request.
Definition: status.hpp:31
util.hpp
Contains declaration for the async op utilities.