ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
abort.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_op
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Contains definition for the AbortAsync operation
7  */
8 #include <daq/op/abort.hpp>
9 
10 #include <Metadaqif.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <log4cplus/loggingmacros.h>
14 
15 #include <daq/op/util.hpp>
16 #include <daq/state.hpp>
17 
18 namespace daq::op {
19 
21  : m_policy(policy), m_params(m_params) {
22 }
23 
24 [[nodiscard]] boost::future<AbortAsync::ResultType> AbortAsync::Initiate() {
25  using boost::future;
26 
27  // Sequencing is not important to abort. So we send request to both primary and metadata
28  // sources immediately instead of synchronizing them.
29  return boost::when_all(AbortMeta(), AbortPrim())
30  .then(m_params.executor,
31  [this](future<std::tuple<future<void>, future<void>>> futs) -> ResultType {
32  LOG4CPLUS_DEBUG(
33  m_params.logger,
34  fmt::format("{}: AbortAsync: AbortMeta() and AbortPrim() completed.",
35  m_params.status));
36  // future from when_all never throws according to specification.
37  auto tup = futs.get();
38  auto& prim = std::get<0>(tup);
39  auto& meta = std::get<1>(tup);
40 
41  if (prim.has_value() && meta.has_value()) {
42  LOG4CPLUS_INFO(m_params.logger,
43  fmt::format("{}: AbortAsync: DAQ aborted successfully.",
44  m_params.status));
45  return {false};
46  }
47  if (m_policy == ErrorPolicy::Tolerant) {
48  // Errors are non-fatal
49  LOG4CPLUS_WARN(
50  m_params.logger,
51  fmt::format("{}: AbortAsync: Data Acquisition abort has errors that will "
52  "be ignored due to error policy.",
53  m_params.status));
54  return {true};
55  } else {
56  // Errors are fatal
57  LOG4CPLUS_ERROR(m_params.logger,
58  fmt::format("{}: AbortAsync: Data Acquisition abort failed.",
59  m_params.status));
60 
61  // Either prim or meta has error, collect them:
62  std::vector<std::variant<DaqSourceError, std::exception_ptr>> errors;
63  try {
64  prim.get(); // throws
65  } catch (DaqSourceError const& e) {
66  errors.emplace_back(e);
67  } catch (...) {
68  errors.emplace_back(std::current_exception());
69  }
70  try {
71  meta.get(); // throws
72  } catch (DaqSourceError const& e) {
73  errors.emplace_back(e);
74  } catch (...) {
75  errors.emplace_back(std::current_exception());
76  }
77  LOG4CPLUS_DEBUG(m_params.logger,
78  fmt::format("{}: Throwing exception.", m_params.status));
79  throw boost::enable_current_exception(DaqSourceErrors(std::move(errors)));
80  }
81  });
82 }
83 
84 [[nodiscard]] boost::future<void> AbortAsync::AbortMeta() {
85  return SendRequestAndCollectReplies<void>(
86  m_params.meta_sources.begin(),
87  m_params.meta_sources.end(),
88  [&](Source<MetaSource>& s) {
89  return s.GetState() != State::Aborted && s.GetState() != State::Stopped;
90  },
91  m_params,
92  // Sender
93  [id = m_params.id](Source<MetaSource>& s) {
94  s.SetState(State::AbortingAcquiring);
95  return s.GetSource().GetRrClient().AbortDaq(id);
96  },
97  // reply handler
98  [](AsyncOpParams m_params,
99  Source<MetaSource>& source,
100  boost::future<std::shared_ptr<metadaqif::DaqReply>>&& fut) -> void {
101  HandleMetaDaqReply("AbortDaq",
102  State::AbortingAcquiring,
103  State::Aborted,
104  {},
105  m_params,
106  source,
107  std::move(fut));
108  },
109  std::string_view("AbortAsync: abort metadata acquisition"))
110  .then(m_params.executor, UnwrapVoidReplies); // @note: UnwrapVoidReplies is thread safe
111 }
112 
113 [[nodiscard]] boost::future<void> AbortAsync::AbortPrim() {
114  return SendRequestAndCollectReplies<void>(
115  m_params.prim_sources.begin(),
116  m_params.prim_sources.end(),
117  [&](Source<PrimSource>& s) { return !IsFinalState(s.GetState()); },
118  m_params,
119  // Sender
120  [](Source<PrimSource>& s) {
121  s.SetState(State::AbortingAcquiring);
122  // id is not supportd by interface
123  return s.GetSource().GetRrClient().RecAbort();
124  },
125  // reply handler
126  [](AsyncOpParams m_params,
127  Source<PrimSource>& source,
128  boost::future<std::string>&& fut) -> void {
129  HandlePrimDaqReply("RecAbort",
130  State::AbortingAcquiring,
131  State::Aborted,
132  {},
133  m_params,
134  source,
135  std::move(fut));
136  },
137  std::string_view("AbortAsync: abort primary data acquisition"))
138  .then(m_params.executor, UnwrapVoidReplies); // @note: UnwrapVoidReplies is thread safe
139 }
140 
141 } // namespace daq::op
daq::op::UnwrapVoidReplies
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void >>> futures)
Unwrap futures to extract errors.
Definition: util.cpp:60
abort.hpp
Contains declaration for the AbortAsync operation.
daq::op::AsyncOpParams::logger
log4cplus::Logger & logger
Definition: asyncOpParams.hpp:101
daq::DaqSourceError
Represents error in single source.
Definition: error.hpp:67
daq::DaqSourceErrors
Exception thrown to carry reply errors.
Definition: error.hpp:84
daq::Source
Simple class that holds the source and associated state.
Definition: source.hpp:29
daq::op::AsyncOpParams::status
ObservableStatus & status
Async operations should not modify state directly DaqController does that.
Definition: asyncOpParams.hpp:94
daq::op
Definition: abort.hpp:19
daq::op::AsyncOpParams::executor
rad::IoExecutor & executor
Definition: asyncOpParams.hpp:100
daq::op::AbortAsync::AbortAsync
AbortAsync(ErrorPolicy policy, AsyncOpParams params) noexcept
Definition: abort.cpp:20
daq::op::AbortAsync::Initiate
boost::future< ResultType > Initiate()
Initiates operation that stats metadata acquisition.
Definition: abort.cpp:24
daq::Result< void >
Definition: utility.hpp:23
state.hpp
Declares daq::State and related functions.
daq::op::AsyncOpParams
Parameters required for each async operation.
Definition: asyncOpParams.hpp:67
daq::ErrorPolicy
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
daq::ErrorPolicy::Strict
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
util.hpp
Contains declaration for the async op utilities.