ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
awaitPrim.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_op
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Contains definition for the AwaitPrimAsync operation
7  */
8 #include <daq/op/awaitPrim.hpp>
9 
10 #include <fmt/format.h>
11 #include <fmt/ostream.h>
12 #include <log4cplus/loggingmacros.h>
13 
14 #include <daq/error/report.hpp>
15 #include <daq/op/util.hpp>
16 #include <daq/state.hpp>
17 
18 namespace {
19 /**
20  * Process all replies together to decide if more awaits are necessary.
21  *
22  * @note Errors from sources are logged, but otherwise ignored.
23  *
24  * @returns true if all awaited-on sources have completed.
25  * @returns false if one or more sources are not completed.
26  */
27 bool AwaitUnwrapReplies(boost::future<std::vector<boost::future<bool>>>&& futures) {
28  std::vector<boost::future<bool>> values = futures.get();
29  if (values.empty()) {
30  // No requests were sent (because they were all completed)
31  return true;
32  }
33  std::vector<std::exception_ptr> exceptions;
34  size_t num_ok = 0;
35  size_t num_exceptions = 0;
36  for (auto& f : values) {
37  try {
38  if (f.get()) {
39  num_ok++;
40  }
41  } catch (std::exception const& e) {
42  num_exceptions++;
43  LOG4CPLUS_INFO(
44  "daq",
45  fmt::format("daq::op::AwaitUnwrapReplies: Source replied with exception: {}",
46  e.what()));
47  }
48  }
49  LOG4CPLUS_DEBUG(
50  "daq",
51  fmt::format("daq::op::AwaitUnwrapReplies: {}/{} replies contained exceptions. {}/{} "
52  "replies report success",
53  num_exceptions,
54  values.size(),
55  num_ok,
56  values.size()));
57 
58  return num_ok == values.size();
59 }
60 
61 class RecWaitSpec : public recif::RecWaitSpec {
62 public:
63  RecWaitSpec(float timeout) : m_timeout{timeout}, m_info{} {
64  }
65  std::string getInfo() const override {
66  return m_info;
67  }
68  void setInfo(const std::string& info) override {
69  m_info = info;
70  }
71 
72  float getTimeout() const override {
73  return m_timeout;
74  }
75  void setTimeout(float timeout) override {
76  m_timeout = timeout;
77  }
78  bool hasKey() const override {
79  return false;
80  }
81  std::unique_ptr<recif::RecWaitSpec> cloneKey() const override {
82  throw std::runtime_error("not clonable");
83  }
84  std::unique_ptr<recif::RecWaitSpec> clone() const override {
85  throw std::runtime_error("not clonable");
86  }
87  bool keyEquals(const recif::RecWaitSpec& other) const override {
88  return false;
89  }
90 
91 private:
92  float m_timeout;
93  std::string m_info;
94 };
95 } // namespace
96 
97 namespace daq::op {
98 
100  : m_params(m_params), m_error(false), m_parts(), m_aborted(false) {
101 }
102 
103 [[nodiscard]] boost::future<Result<DpParts>> AwaitPrimAsync::Initiate() {
104  LOG4CPLUS_DEBUG(m_params.common.logger,
105  fmt::format("AwaitPrimAsync::Initiate: Operation initiating"));
106  InitiateAwait();
107  return m_promise.get_future();
108 }
109 
110 void AwaitPrimAsync::InitiateAwait() {
111  // If the last await was processed before the configured interval expired wait until that time
112  // has elapsed.
113  MakeInterval()
114  .then(m_params.common.executor, [this](auto) { return AwaitOnceAsync(); })
115  .unwrap()
116  .then(m_params.common.executor, [this](boost::future<bool> fut) -> void {
117  // If condition is fulfilled, set reply value or exception.
118  if (m_aborted) {
119  LOG4CPLUS_DEBUG(m_params.common.logger,
120  fmt::format("AwaitPrimAsync::InitiateAwait: Operation aborted"));
121  return;
122  }
123  try {
124  auto is_ok = fut.get();
125  if (is_ok) {
126  // All done, set promise
127  this->m_promise.set_value({m_error, std::move(m_parts)});
128  } else {
129  // If condition is not fulfilled we do it again.
130  this->InitiateAwait();
131  }
132  } catch (...) {
133  // Fatal error. Report to operation invoker.
134  this->m_promise.set_exception(boost::current_exception());
135  }
136  });
137 }
138 
139 boost::future<void> AwaitPrimAsync::MakeInterval() {
140  using std::chrono::duration_cast;
141  using std::chrono::milliseconds;
142  using std::chrono::steady_clock;
143 
144  if (!m_last_start) {
145  // First time -> return ready future
146  return boost::make_ready_future();
147  }
148 
149  auto now = steady_clock::now();
150  auto next_start = *m_last_start + duration_cast<steady_clock::duration>(m_params.wait_interval);
151  if (now >= next_start) {
152  // Interval already expired -> return ready future
153  return boost::make_ready_future();
154  }
155  LOG4CPLUS_DEBUG(m_params.common.logger,
156  fmt::format("AwaitPrimAsync::MakeInterval: Waiting {}ms until sending RecWait",
157  duration_cast<milliseconds>(next_start - now).count()));
158  // Wait until
159  m_interval.reset();
160  m_interval.emplace(m_params.common.executor.get_io_context(), next_start);
161  m_interval->timer.async_wait([this](boost::system::error_code const& ec) {
162  // We don't care if we're cancelled, we have to set promise anyway
163  // to avoid broken promise
164  m_interval->promise.set_value();
165  });
166  return m_interval->promise.get_future();
167 }
168 
169 void AwaitPrimAsync::Abort() noexcept {
170  LOG4CPLUS_INFO(m_params.common.logger,
171  fmt::format("AwaitPrimAsync::Abort: Requested to abort! "
172  "Number of files received so far: {}",
173  m_parts.size()));
174  if (m_aborted) {
175  return;
176  }
177  m_aborted = true;
178  m_interval.reset();
179  m_promise.set_value({m_error, std::move(m_parts)});
180 }
181 
182 boost::future<bool> AwaitPrimAsync::AwaitOnceAsync() {
183  using Reply = std::shared_ptr<recif::RecWaitStatus>;
184  using Seconds = std::chrono::duration<float>;
185  using std::chrono::duration_cast;
186  if (m_aborted) {
187  LOG4CPLUS_DEBUG(m_params.common.logger,
188  fmt::format("AwaitPrimAsync::AwaitOnceAsync: Operation aborted"));
189  return boost::make_ready_future<bool>(true);
190  }
191  LOG4CPLUS_DEBUG(m_params.common.logger,
192  fmt::format("AwaitPrimAsync::AwaitOnceAsync: Sending requests..."));
193 
194  m_last_start = std::chrono::steady_clock::now();
195 
196  return SendRequestAndCollectReplies<bool>(
197  m_params.common.prim_sources.begin(),
198  m_params.common.prim_sources.end(),
199  [](Source<PrimSource> const& source) -> bool {
200  /* only send to sources that are not already stopped */
201  return IsSubsequentState(State::Stopped, source.GetState());
202  },
203  m_params.common,
204  // Sender
205  [this](Source<PrimSource>& s) -> boost::future<Reply> {
206  auto& client = s.GetSource().GetRrClient();
207  auto seconds = duration_cast<Seconds>(this->m_params.wait_interval).count();
208 #if defined(UNIT_TEST)
209  // ECII-783: MAL suddenly requires that interface *must* be implemented by
210  // matching middleware otherwise it fails.
211  // To avoid complicated mocking we still use our local implementation for unit
212  // testing.
213  auto spec = std::make_shared<RecWaitSpec>(seconds);
214 #else
215  auto mal = client.getMal();
216  BOOST_ASSERT_MSG(mal, "MAL RR client returned invalid MAL pointer");
217  auto spec = mal->createDataEntity<recif::RecWaitSpec>();
218  // Initialize all members by assigning from local implementation
219  *spec = RecWaitSpec(seconds);
220 #endif
221  return client.RecWait(spec);
222  },
223  // reply handler
224  [this](AsyncOpParams params, Source<PrimSource>& source, boost::future<Reply>&& fut)
225  -> bool { return HandleRecWaitReply(source, std::move(fut)); },
226  std::string_view("AwaitPrimAsync: await primary data acquisition"))
227  .then(m_params.common.executor, AwaitUnwrapReplies);
228 }
229 
230 bool AwaitPrimAsync::HandleRecWaitReply(
231  Source<PrimSource>& source, boost::future<std::shared_ptr<recif::RecWaitStatus>>&& fut) {
232  // Note: Alerts are set and cleared immedately rather than deferring to
233  // a later point as the await operation can take a very long time.
234  auto alert_id =
235  MakeAlertId(alert::REQUEST, std::string("RecWait") + source.GetSource().GetName());
236  try {
237  auto reply = fut.get();
238  // Request did not throw -> clear any alert
239  m_params.common.status.ClearAlert(alert_id);
240 
241  auto status = reply->getStatus();
242  if (status == recif::Success) {
243  // Await returned
244  source.SetState(State::Stopped);
245 
246  auto rec_status = reply->getRecStatus();
247  LOG4CPLUS_INFO(m_params.common.logger,
248  fmt::format("Data source '{}' replied successfully and provides {} "
249  "number of files",
250  source.GetSource(),
251  rec_status->getDpFiles().size()));
252  if (rec_status->getDpFiles().empty()) {
253  LOG4CPLUS_WARN(m_params.common.logger,
254  fmt::format("Data source '{}' replied successfully for "
255  "RecWait but did not produce any files!",
256  source.GetSource()));
257  }
258  for (auto const& file : rec_status->getDpFiles()) {
259  m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
260  }
261  // All ok, return true
262  return true;
263  } else {
264  // Wait timed out, return false to resend
265  return false;
266  }
267  } catch (recif::ExceptionErr const& e) {
268  m_params.common.status.SetAlert(
269  MakeAlert(alert_id,
270  fmt::format("Primary source '{}' request 'RecWaitStatus' replied "
271  "with ICD error: ({}) {}",
272  source.GetSource().GetName(),
273  e.getCode(),
274  e.getDesc())));
275  m_error = true;
276  throw boost::enable_current_exception(
277  DaqSourceError("RecWait", std::string(source.GetSource().GetName()), e.what()));
278  } catch (...) {
279  auto what = error::FormatException(std::current_exception());
280  m_params.common.status.SetAlert(
281  MakeAlert(alert_id,
282  fmt::format("Primary source '{}' request 'RecWaitStatus' replied "
283  "with non-ICD error: {}",
284  source.GetSource().GetName(),
285  what)));
286  m_error = true;
287  throw boost::enable_current_exception(DaqSourceError(
288  "RecWait", std::string(source.GetSource().GetName()), "unknown exception"));
289  }
290 }
291 } // namespace daq::op
Contains declaration for the AwaitPrimAsync operation.
Declares daq::State and related functions.
constexpr std::string_view REQUEST
Request.
Definition: status.hpp:31
void FormatException(std::ostream &os, std::exception_ptr ptr)
Report without nesting.
Definition: report.cpp:79
void Reply(AsyncOpParams &params, SourceType const &source, std::string description, bool error)
Definition: util.hpp:30
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:49
@ Stopped
All data sources have reported they have stopped acquiring data.
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:39
Simple class that holds the source and associated state.
Definition: source.hpp:29
rad::IoExecutor & executor
log4cplus::Logger & logger
Await specific parameters that is not provided with AsyncOpParams.
boost::future< Result< DpParts > > Initiate()
Initiates operation that await acquisition completion.
Definition: awaitPrim.cpp:103
AwaitPrimAsync(AwaitOpParams params) noexcept
Constructs operation with the privided parameters.
Definition: awaitPrim.cpp:99
Contains declaration for the async op utilities.