ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
awaitPrim.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_op
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Contains definition for the AwaitPrimAsync operation
7  */
8 #include <daq/op/awaitPrim.hpp>
9 
10 #include <fmt/format.h>
11 #include <fmt/ostream.h>
12 #include <log4cplus/loggingmacros.h>
13 
14 #include <daq/error/report.hpp>
15 #include <daq/op/util.hpp>
16 #include <daq/state.hpp>
17 
18 namespace {
19 /**
20  * Process all replies together to decide if more awaits are necessary.
21  *
22  * @note Errors from sources are logged, but otherwise ignored.
23  *
24  * @returns true if all awaited-on sources have completed.
25  * @returns false if one or more sources are not completed.
26  */
27 bool AwaitUnwrapReplies(boost::future<std::vector<boost::future<bool>>>&& futures) {
28  std::vector<boost::future<bool>> values = futures.get();
29  if (values.empty()) {
30  // No requests were sent (because they were all completed)
31  return true;
32  }
33  std::vector<std::exception_ptr> exceptions;
34  size_t num_ok = 0;
35  size_t num_exceptions = 0;
36  for (auto& f : values) {
37  try {
38  if (f.get()) {
39  num_ok++;
40  }
41  } catch (std::exception const& e) {
42  num_exceptions++;
43  LOG4CPLUS_INFO(
44  "daq",
45  fmt::format("daq::op::AwaitUnwrapReplies: Source replied with exception: {}",
46  e.what()));
47  }
48  }
49  LOG4CPLUS_DEBUG(
50  "daq",
51  fmt::format("daq::op::AwaitUnwrapReplies: {}/{} replies contained exceptions. {}/{} "
52  "replies report success",
53  num_exceptions,
54  values.size(),
55  num_ok,
56  values.size()));
57 
58  return num_ok == values.size();
59 }
60 
61 class RecWaitSpec : public recif::RecWaitSpec {
62 public:
63  RecWaitSpec(float timeout) : m_timeout{timeout}, m_info{} {
64  }
65  std::string getInfo() const override {
66  return m_info;
67  }
68  void setInfo(const std::string& info) override {
69  m_info = info;
70  }
71 
72  float getTimeout() const override {
73  return m_timeout;
74  }
75  void setTimeout(float timeout) override {
76  m_timeout = timeout;
77  }
78  bool hasKey() const override {
79  return false;
80  }
81  std::unique_ptr<recif::RecWaitSpec> cloneKey() const override {
82  throw std::runtime_error("not clonable");
83  }
84  std::unique_ptr<recif::RecWaitSpec> clone() const override {
85  throw std::runtime_error("not clonable");
86  }
87  bool keyEquals(const recif::RecWaitSpec& other) const override {
88  return false;
89  }
90 
91 private:
92  float m_timeout;
93  std::string m_info;
94 };
95 } // namespace
96 
97 namespace daq::op {
98 
100  : m_params(m_params), m_error(false), m_parts(), m_aborted(false) {
101 }
102 
103 [[nodiscard]] boost::future<Result<DpParts>> AwaitPrimAsync::Initiate() {
104  LOG4CPLUS_DEBUG(m_params.common.logger,
105  fmt::format("AwaitPrimAsync::Initiate: Operation initiating"));
106  InitiateAwait();
107  return m_promise.get_future();
108 }
109 
110 void AwaitPrimAsync::InitiateAwait() {
111  // If the last await was processed before the configured interval expired wait until that time
112  // has elapsed.
113  MakeInterval()
114  .then(m_params.common.executor, [this](auto) { return AwaitOnceAsync(); })
115  .unwrap()
116  .then(m_params.common.executor, [this](boost::future<bool> fut) -> void {
117  // If condition is fulfilled, set reply value or exception.
118  if (m_aborted) {
119  LOG4CPLUS_DEBUG(m_params.common.logger,
120  fmt::format("AwaitPrimAsync::InitiateAwait: Operation aborted"));
121  return;
122  }
123  try {
124  auto is_ok = fut.get();
125  if (is_ok) {
126  // All done, set promise
127  this->m_promise.set_value({m_error, std::move(m_parts)});
128  } else {
129  // If condition is not fulfilled we do it again.
130  this->InitiateAwait();
131  }
132  } catch (...) {
133  // Fatal error. Report to operation invoker.
134  this->m_promise.set_exception(boost::current_exception());
135  }
136  });
137 }
138 
139 boost::future<void> AwaitPrimAsync::MakeInterval() {
140  using std::chrono::duration_cast;
141  using std::chrono::milliseconds;
142  using std::chrono::steady_clock;
143 
144  if (!m_last_start) {
145  // First time -> return ready future
146  return boost::make_ready_future();
147  }
148 
149  auto now = steady_clock::now();
150  auto next_start = *m_last_start + duration_cast<steady_clock::duration>(m_params.wait_interval);
151  if (now >= next_start) {
152  // Interval already expired -> return ready future
153  return boost::make_ready_future();
154  }
155  LOG4CPLUS_DEBUG(m_params.common.logger,
156  fmt::format("AwaitPrimAsync::MakeInterval: Waiting {}ms until sending RecWait",
157  duration_cast<milliseconds>(next_start - now).count()));
158  // Wait until
159  m_interval.reset();
160  m_interval.emplace(m_params.common.executor.get_io_context(), next_start);
161  m_interval->timer.async_wait([this](boost::system::error_code const& ec) {
162  // We don't care if we're cancelled, we have to set promise anyway
163  // to avoid broken promise
164  m_interval->promise.set_value();
165  });
166  return m_interval->promise.get_future();
167 }
168 
169 void AwaitPrimAsync::Abort() noexcept {
170  LOG4CPLUS_INFO(m_params.common.logger,
171  fmt::format("AwaitPrimAsync::Abort: Requested to abort! "
172  "Number of files received so far: {}",
173  m_parts.size()));
174  if (m_aborted) {
175  return;
176  }
177  m_aborted = true;
178  m_interval.reset();
179  m_promise.set_value({m_error, std::move(m_parts)});
180 }
181 
182 boost::future<bool> AwaitPrimAsync::AwaitOnceAsync() {
183  using Reply = std::shared_ptr<recif::RecWaitStatus>;
184  using Seconds = std::chrono::duration<float>;
185  using std::chrono::duration_cast;
186  if (m_aborted) {
187  LOG4CPLUS_DEBUG(m_params.common.logger,
188  fmt::format("AwaitPrimAsync::AwaitOnceAsync: Operation aborted"));
189  return boost::make_ready_future<bool>(true);
190  }
191  LOG4CPLUS_DEBUG(m_params.common.logger,
192  fmt::format("AwaitPrimAsync::AwaitOnceAsync: Sending requests..."));
193 
194  m_last_start = std::chrono::steady_clock::now();
195 
196  return SendRequestAndCollectReplies<bool>(
197  m_params.common.prim_sources.begin(),
198  m_params.common.prim_sources.end(),
199  [](Source<PrimSource> const& source) -> bool {
200  /* only send to sources that are not already stopped */
201  return IsSubsequentState(State::Stopped, source.GetState());
202  },
203  m_params.common,
204  // Sender
205  [this](Source<PrimSource>& s) -> boost::future<Reply> {
206  auto spec = std::make_shared<RecWaitSpec>(
207  duration_cast<Seconds>(this->m_params.wait_interval).count());
208  return s.GetSource().GetRrClient().RecWait(spec);
209  },
210  // reply handler
211  [this](AsyncOpParams params, Source<PrimSource>& source, boost::future<Reply>&& fut)
212  -> bool { return HandleRecWaitReply(source, std::move(fut)); },
213  std::string_view("AwaitPrimAsync: await primary data acquisition"))
214  .then(m_params.common.executor, AwaitUnwrapReplies);
215 }
216 
217 bool AwaitPrimAsync::HandleRecWaitReply(
218  Source<PrimSource>& source, boost::future<std::shared_ptr<recif::RecWaitStatus>>&& fut) {
219  // Note: Alerts are set and cleared immedately rather than deferring to
220  // a later point as the await operation can take a very long time.
221  auto alert_id =
222  MakeAlertId(alert::REQUEST, std::string("RecWait") + source.GetSource().GetName());
223  try {
224  auto reply = fut.get();
225  // Request did not throw -> clear any alert
226  m_params.common.status.ClearAlert(alert_id);
227 
228  auto status = reply->getStatus();
229  if (status == recif::Success) {
230  // Await returned
231  source.SetState(State::Stopped);
232 
233  auto rec_status = reply->getRecStatus();
234  LOG4CPLUS_INFO(m_params.common.logger,
235  fmt::format("Data source '{}' replied successfully and provides {} "
236  "number of files",
237  source.GetSource(),
238  rec_status->getDpFiles().size()));
239  if (rec_status->getDpFiles().empty()) {
240  LOG4CPLUS_WARN(m_params.common.logger,
241  fmt::format("Data source '{}' replied successfully for "
242  "RecWait but did not produce any files!",
243  source.GetSource()));
244  }
245  for (auto const& file : rec_status->getDpFiles()) {
246  m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
247  }
248  // All ok, return true
249  return true;
250  } else {
251  // Wait timed out, return false to resend
252  return false;
253  }
254  } catch (recif::ExceptionErr const& e) {
255  m_params.common.status.SetAlert(
256  MakeAlert(alert_id,
257  fmt::format("Primary source '{}' request 'RecWaitStatus' replied "
258  "with ICD error: ({}) {}",
259  source.GetSource().GetName(),
260  e.getCode(),
261  e.getDesc())));
262  m_error = true;
263  throw boost::enable_current_exception(
264  DaqSourceError("RecWait", std::string(source.GetSource().GetName()), e.what()));
265  } catch (...) {
266  auto what = error::FormatException(std::current_exception());
267  m_params.common.status.SetAlert(
268  MakeAlert(alert_id,
269  fmt::format("Primary source '{}' request 'RecWaitStatus' replied "
270  "with non-ICD error: {}",
271  source.GetSource().GetName(),
272  what)));
273  m_error = true;
274  throw boost::enable_current_exception(DaqSourceError(
275  "RecWait", std::string(source.GetSource().GetName()), "unknown exception"));
276  }
277 }
278 } // namespace daq::op
daq::op::eventlog::Reply
void Reply(AsyncOpParams &params, SourceType const &source, std::string description, bool error)
Definition: util.hpp:30
daq::op::AwaitPrimAsync::AwaitPrimAsync
AwaitPrimAsync(AwaitOpParams params) noexcept
Constructs operation with the privided parameters.
Definition: awaitPrim.cpp:99
daq::op::AsyncOpParams::logger
log4cplus::Logger & logger
Definition: asyncOpParams.hpp:101
daq::MakeAlertId
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:39
daq::op::AwaitPrimAsync::Initiate
boost::future< Result< DpParts > > Initiate()
Initiates operation that await acquisition completion.
Definition: awaitPrim.cpp:103
daq::Source
Simple class that holds the source and associated state.
Definition: source.hpp:29
report.hpp
daq::error::FormatException
void FormatException(std::ostream &os, std::exception_ptr ptr)
Report without nesting.
Definition: report.cpp:79
daq::op::AwaitOpParams
Await specific parameters that is not provided with AsyncOpParams.
Definition: asyncOpParams.hpp:114
daq::op
Definition: abort.hpp:19
daq::op::AsyncOpParams::executor
rad::IoExecutor & executor
Definition: asyncOpParams.hpp:100
daq::MakeAlert
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:29
daq::op::AwaitOpParams::common
AsyncOpParams common
Definition: asyncOpParams.hpp:122
awaitPrim.hpp
Contains declaration for the AwaitPrimAsync operation.
state.hpp
Declares daq::State and related functions.
daq::alert::REQUEST
constexpr std::string_view REQUEST
Request.
Definition: status.hpp:31
util.hpp
Contains declaration for the async op utilities.