ifw-daq  1.0.0
IFW Data Acquisition modules
abort.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_op
4  * @copyright 2021 ESO - European Southern Observatory
5  *
6  * @brief Contains definition for the AbortAsync operation
7  */
8 #include <daq/op/abort.hpp>
9 
10 #include <Metadaqif.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <log4cplus/loggingmacros.h>
14 
15 #include <daq/op/util.hpp>
16 #include <daq/state.hpp>
17 
18 namespace daq::op {
19 
21  : m_policy(policy), m_params(m_params) {
22 }
23 
24 [[nodiscard]] boost::future<AbortAsync::ResultType> AbortAsync::Initiate() {
25  using boost::future;
26 
27  // Sequencing is not important to abort. So we send request to both primary and metadata
28  // sources immediately instead of synchronizing them.
29  return boost::when_all(AbortMeta(), AbortPrim())
30  .then(m_params.executor,
31  [this](future<std::tuple<future<void>, future<void>>> futs) -> ResultType {
32  LOG4CPLUS_DEBUG(
33  m_params.logger,
34  fmt::format("{}: AbortAsync: AbortMeta() and AbortPrim() completed.", m_params.status));
35  auto tup = futs.get();
36  auto& prim = std::get<0>(tup);
37  auto& meta = std::get<1>(tup);
38 
39  if (prim.has_value() && meta.has_value()) {
40  LOG4CPLUS_INFO(
41  m_params.logger,
42  fmt::format("{}: AbortAsync: DAQ aborted successfully.", m_params.status));
43  return { false };
44  }
45  if (m_policy == ErrorPolicy::Tolerant) {
46  // Errors are non-fatal
47  LOG4CPLUS_INFO(
48  m_params.logger,
49  fmt::format("{}: AbortAsync: Data Acquisition abort has errors that will "
50  "be ignored due to error policy.", m_params.status));
51  return { true };
52  } else {
53  // Errors are fatal
54  LOG4CPLUS_INFO(
55  m_params.logger,
56  fmt::format("{}: AbortAsync: Data Acquisition abort failed.", m_params.status));
57 
58  // Either prim or meta has error, collect them:
59  std::vector<std::variant<DaqSourceError, std::exception_ptr>> errors;
60  try {
61  prim.get(); // throws
62  } catch (DaqSourceError const& e) {
63  errors.emplace_back(e);
64  } catch (...) {
65  errors.emplace_back(std::current_exception());
66  }
67  try {
68  meta.get(); // throws
69  } catch (DaqSourceError const& e) {
70  errors.emplace_back(e);
71  } catch (...) {
72  errors.emplace_back(std::current_exception());
73  }
74  LOG4CPLUS_DEBUG(
75  m_params.logger,
76  fmt::format("{}: Throwing exception.", m_params.status));
77  throw boost::enable_current_exception(DaqSourceErrors(std::move(errors)));
78  }
79  });
80 }
81 
82 
83 [[nodiscard]] boost::future<void> AbortAsync::AbortMeta() {
84  return SendRequestAndCollectReplies<void>(
85  m_params.meta_sources.begin(),
86  m_params.meta_sources.end(),
87  [&](Source<MetaSource>& s) {
88  return s.GetState() != State::Aborted && s.GetState() != State::Stopped;
89  },
90  m_params,
91  // Sender
92  [id = m_params.id](Source<MetaSource>& s) {
93  s.SetState(State::Aborting);
94  return s.GetSource().GetRrClient().AbortDaq(id);
95  },
96  // reply handler
97  [](AsyncOpParams m_params,
98  Source<MetaSource>& source,
99  boost::future<std::shared_ptr<metadaqif::DaqReply>>&& fut) -> void {
100  HandleMetaDaqReply(State::Aborting, State::Aborted, {}, m_params, source, std::move(fut));
101  },
102  std::string_view("AbortAsync: abort metadata acquisition"))
103  .then(m_params.executor, UnwrapVoidReplies); // @note: UnwrapVoidReplies is thread safe
104 }
105 
106 
107 [[nodiscard]] boost::future<void> AbortAsync::AbortPrim() {
108  return SendRequestAndCollectReplies<void>(
109  m_params.prim_sources.begin(),
110  m_params.prim_sources.end(),
111  [&](Source<PrimSource>& s) {
112  return !IsFinalState(s.GetState());
113  },
114  m_params,
115  // Sender
116  [](Source<PrimSource>& s) {
117  s.SetState(State::Aborting);
118  // id is not supportd by interface
119  return s.GetSource().GetRrClient().RecAbort();
120  },
121  // reply handler
122  [](AsyncOpParams m_params,
123  Source<PrimSource>& source,
124  boost::future<std::string>&& fut) -> void {
125  HandlePrimDaqReply(State::Aborting, State::Aborted, {}, m_params, source, std::move(fut));
126  },
127  std::string_view("AbortAsync: abort primary data acquisition"))
128  .then(m_params.executor, UnwrapVoidReplies); // @note: UnwrapVoidReplies is thread safe
129 }
130 
131 } // namespace daq::op
daq::op::AbortAsync::AbortAsync
AbortAsync(ErrorPolicy policy, AsyncOpParams params) noexcept
Definition: abort.cpp:20
abort.hpp
Contains declaration for the AbortAsync operation.
daq::op::AbortAsync::Initiate
boost::future< ResultType > Initiate()
Initiates operation that stats metadata acquisition.
Definition: abort.cpp:24
daq::op::AsyncOpParams::logger
log4cplus::Logger & logger
Definition: asyncOpParams.hpp:51
daq::DaqSourceErrors
Exception thrown to carry reply errors.
Definition: error.hpp:84
daq::op::UnwrapVoidReplies
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void >>> &&futures)
Unwrap futures to extract errors.
Definition: util.cpp:54
daq::Source
Simple class that holds the source and associated state.
Definition: source.hpp:29
daq::op
Definition: abort.hpp:19
daq::op::AsyncOpParams::executor
rad::IoExecutor & executor
Definition: asyncOpParams.hpp:50
daq::op::AsyncOpParams::status
ObservableStatus const & status
Async operations should not modify status directly DaqController does that.
Definition: asyncOpParams.hpp:48
daq::Result< void >
Definition: utility.hpp:23
daq::op::AsyncOpParams
Parameters required for each async operation.
Definition: asyncOpParams.hpp:23
daq::DaqSourceError
Represents error in single source.
Definition: error.hpp:67
state.hpp
Declares daq::State and related functions.
daq::ErrorPolicy
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
daq::ErrorPolicy::Strict
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
util.hpp
Contains declaration for the async op utilities.