ifw-daq  1.0.0
IFW Data Acquisition modules
awaitPrim.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_op
4  * @copyright 2021 ESO - European Southern Observatory
5  *
6  * @brief Contains definition for the AwaitPrimAsync operation
7  */
8 #include <daq/op/awaitPrim.hpp>
9 
10 #include <fmt/format.h>
11 #include <fmt/ostream.h>
12 #include <log4cplus/loggingmacros.h>
13 
14 #include <daq/op/util.hpp>
15 #include <daq/state.hpp>
16 
17 namespace {
18 /**
19  * Process all replies together to decide if more awaits are necessary.
20  *
21  * @note Errors from sources are logged, but otherwise ignored.
22  *
23  * @returns true if all awaited-on sources have completed.
24  * @returns false if one or more sources are not completed.
25  */
26 bool AwaitUnwrapReplies(boost::future<std::vector<boost::future<bool>>>&& futures) {
27  std::vector<boost::future<bool>> values = futures.get();
28  if (values.empty()) {
29  // No requests were sent (because they were all completed)
30  return true;
31  }
32  std::vector<std::exception_ptr> exceptions;
33  size_t num_ok = 0;
34  size_t num_exceptions = 0;
35  for (auto& f : values) {
36  try {
37  if (f.get()) {
38  num_ok++;
39  }
40  } catch (std::exception const& e) {
41  num_exceptions++;
42  LOG4CPLUS_INFO(
43  "daq",
44  fmt::format("daq::op::AwaitUnwrapReplies: Source replied with exception: {}",
45  e.what()));
46  }
47  }
48  LOG4CPLUS_DEBUG(
49  "daq",
50  fmt::format("daq::op::AwaitUnwrapReplies: {}/{} replies contained exceptions. {}/{} "
51  "replies report success",
52  num_exceptions,
53  values.size(),
54  num_ok,
55  values.size()));
56 
57  return num_ok == values.size();
58 }
59 
60 class RecWaitSpec : public recif::RecWaitSpec {
61 public:
62  RecWaitSpec(float timeout) : m_timeout{timeout}, m_info{} {
63  }
64  std::string getInfo() const override {
65  return m_info;
66  }
67  void setInfo(const std::string& info) override {
68  m_info = info;
69  }
70 
71  float getTimeout() const override {
72  return m_timeout;
73  }
74  void setTimeout(float timeout) override {
75  m_timeout = timeout;
76  }
77  bool hasKey() const override {
78  return false;
79  }
80  std::unique_ptr<recif::RecWaitSpec> cloneKey() const override {
81  throw std::runtime_error("not clonable");
82  }
83  std::unique_ptr<recif::RecWaitSpec> clone() const override {
84  throw std::runtime_error("not clonable");
85  }
86  bool keyEquals(const recif::RecWaitSpec& other) const override {
87  return false;
88  }
89 
90 private:
91  float m_timeout;
92  std::string m_info;
93 };
94 } // namespace
95 
96 namespace daq::op {
97 
99  : m_params(m_params), m_error(false), m_parts(), m_aborted(false) {
100 }
101 
102 [[nodiscard]] boost::future<Result<DpParts>> AwaitPrimAsync::Initiate() {
103  LOG4CPLUS_DEBUG(m_params.common.logger,
104  fmt::format("AwaitPrimAsync::Initiate: Operation initiating"));
105  InitiateAwait();
106  return m_promise.get_future();
107 }
108 
109 void AwaitPrimAsync::InitiateAwait() {
110  // If the last await was processed before the configured interval expired wait until that time
111  // has elapsed.
112  MakeInterval()
113  .then(m_params.common.executor, [this](auto) { return AwaitOnceAsync(); })
114  .unwrap()
115  .then(m_params.common.executor, [this](boost::future<bool> fut) -> void {
116  // If condition is fulfilled, set reply value or exception.
117  if (m_aborted) {
118  LOG4CPLUS_DEBUG(m_params.common.logger,
119  fmt::format("AwaitPrimAsync::InitiateAwait: Operation aborted"));
120  return;
121  }
122  try {
123  auto is_ok = fut.get();
124  if (is_ok) {
125  // All done, set promise
126  this->m_promise.set_value({m_error, std::move(m_parts)});
127  } else {
128  // If condition is not fulfilled we do it again.
129  this->InitiateAwait();
130  }
131  } catch (...) {
132  // Fatal error. Report to operation invoker.
133  this->m_promise.set_exception(boost::current_exception());
134  }
135  });
136 }
137 
138 boost::future<void> AwaitPrimAsync::MakeInterval() {
139  using std::chrono::duration_cast;
140  using std::chrono::steady_clock;
141  using std::chrono::milliseconds;
142 
143  if (!m_last_start) {
144  // First time -> return ready future
145  return boost::make_ready_future();
146  }
147 
148  auto now = steady_clock::now();
149  auto next_start = *m_last_start + duration_cast<steady_clock::duration>(m_params.wait_interval);
150  if (now >= next_start) {
151  // Interval already expired -> return ready future
152  return boost::make_ready_future();
153  }
154  LOG4CPLUS_DEBUG(m_params.common.logger,
155  fmt::format("AwaitPrimAsync::MakeInterval: Waiting {}ms until sending RecWait",
156  duration_cast<milliseconds>(next_start - now).count()));
157  // Wait until
158  m_interval.reset();
159  m_interval.emplace(m_params.common.executor.get_io_context(), next_start);
160  m_interval->timer.async_wait([this](boost::system::error_code const& ec){
161  // We don't care if we're cancelled, we have to set promise anyway
162  // to avoid broken promise
163  m_interval->promise.set_value();
164  });
165  return m_interval->promise.get_future();
166 }
167 
168 void AwaitPrimAsync::Abort() noexcept {
169  LOG4CPLUS_INFO(m_params.common.logger,
170  fmt::format("AwaitPrimAsync::Abort: Requested to abort! "
171  "Number of files received so far: {}",
172  m_parts.size()));
173  if (m_aborted) {
174  return;
175  }
176  m_aborted = true;
177  m_interval.reset();
178  m_promise.set_value({m_error, std::move(m_parts)});
179 }
180 
181 boost::future<bool> AwaitPrimAsync::AwaitOnceAsync() {
182  using Reply = std::shared_ptr<recif::RecWaitStatus>;
183  using Seconds = std::chrono::duration<float>;
184  using std::chrono::duration_cast;
185  if (m_aborted) {
186  LOG4CPLUS_DEBUG(m_params.common.logger,
187  fmt::format("AwaitPrimAsync::AwaitOnceAsync: Operation aborted"));
188  return boost::make_ready_future<bool>(true);
189  }
190  LOG4CPLUS_DEBUG(m_params.common.logger,
191  fmt::format("AwaitPrimAsync::AwaitOnceAsync: Sending requests..."));
192 
193  m_last_start = std::chrono::steady_clock::now();
194 
195  return SendRequestAndCollectReplies<bool>(
196  m_params.common.prim_sources.begin(),
197  m_params.common.prim_sources.end(),
198  [](Source<PrimSource> const& source) -> bool {
199  /* only send to sources that are not already stopped */
200  return !IsFinalState(source.GetState());
201  },
202  m_params.common,
203  // Sender
204  [this](Source<PrimSource>& s) -> boost::future<Reply> {
205  auto spec = std::make_shared<RecWaitSpec>(
206  duration_cast<Seconds>(this->m_params.wait_interval).count());
207  return s.GetSource().GetRrClient().RecWait(spec);
208  },
209  // reply handler
210  [this](AsyncOpParams params, Source<PrimSource>& source, boost::future<Reply>&& fut)
211  -> bool { return HandleRecWaitReply(source, std::move(fut)); },
212  std::string_view("AwaitPrimAsync: await primary data acquisition"))
213  .then(m_params.common.executor, AwaitUnwrapReplies);
214 }
215 
216 bool AwaitPrimAsync::HandleRecWaitReply(Source<PrimSource>& source,
217  boost::future<std::shared_ptr<recif::RecWaitStatus>>&& fut) {
218  try {
219  auto reply = fut.get();
220  auto status = reply->getStatus();
221  if (status == recif::Success) {
222  // Await returned
223  source.SetState(State::Stopped);
224 
225  auto rec_status = reply->getRecStatus();
226  LOG4CPLUS_INFO(m_params.common.logger,
227  fmt::format("Data source '{}' replied successfully and provides {} "
228  "number of files",
229  source.GetSource(),
230  rec_status->getDpFiles().size()));
231  if (rec_status->getDpFiles().empty()) {
232  LOG4CPLUS_WARN(m_params.common.logger,
233  fmt::format("Data source '{}' replied successfully for "
234  "RecWait but did not produce any files!",
235  source.GetSource()));
236  }
237  for (auto const& file : rec_status->getDpFiles()) {
238  m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
239  }
240  // All ok, return true
241  return true;
242  } else {
243  // Wait timed out, return false to resend
244  return false;
245  }
246  } catch (recif::ExceptionErr const& e) {
247  m_error = true;
248  throw boost::enable_current_exception(
249  DaqSourceError("RecWait", std::string(source.GetSource().GetName()), e.what()));
250  } catch (...) {
251  m_error = true;
252  throw boost::enable_current_exception(DaqSourceError(
253  "RecWait", std::string(source.GetSource().GetName()), "unknown exception"));
254  }
255 }
256 } // namespace daq::op
daq::op::eventlog::Reply
void Reply(AsyncOpParams &params, SourceType const &source, std::string description, bool error)
Definition: util.hpp:30
daq::op::AsyncOpParams::logger
log4cplus::Logger & logger
Definition: asyncOpParams.hpp:51
daq::Source
Simple class that holds the source and associated state.
Definition: source.hpp:29
daq::op::AwaitOpParams
Await specific parameters that is not provided with AsyncOpParams.
Definition: asyncOpParams.hpp:64
daq::op
Definition: abort.hpp:19
daq::op::AsyncOpParams::executor
rad::IoExecutor & executor
Definition: asyncOpParams.hpp:50
daq::op::AwaitPrimAsync::Initiate
boost::future< Result< DpParts > > Initiate()
Initiates operation that await acquisition completion.
Definition: awaitPrim.cpp:102
daq::op::AwaitOpParams::common
AsyncOpParams common
Definition: asyncOpParams.hpp:72
awaitPrim.hpp
Contains declaration for the AwaitPrimAsync operation.
daq::op::AwaitPrimAsync::AwaitPrimAsync
AwaitPrimAsync(AwaitOpParams params) noexcept
Constructs operation with the privided parameters.
Definition: awaitPrim.cpp:98
state.hpp
Declares daq::State and related functions.
util.hpp
Contains declaration for the async op utilities.