ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
stop.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_op
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Contains definition for the StopAsync operation
7  */
8 #include <daq/op/stop.hpp>
9 
10 #include <Metadaqif.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <log4cplus/loggingmacros.h>
14 
15 #include <daq/fits/json.hpp>
16 #include <daq/op/util.hpp>
17 #include <daq/state.hpp>
18 
19 namespace daq::op {
20 
22  : m_policy(policy), m_params(m_params), m_error(false) {
23 }
24 
25 [[nodiscard]] boost::future<Result<DpParts>> StopAsync::Initiate() {
26  using boost::future;
27  using boost::make_exceptional_future;
28 
29  // stop primary then metadata.
30  return StopPrim()
31  .then(m_params.executor,
32  [this](future<void> prim_result) -> future<void> {
33  // Simply propagate errors, if any
34  if (prim_result.has_exception()) {
35  m_error = true;
36  if (m_policy == ErrorPolicy::Strict) {
37  LOG4CPLUS_INFO(m_params.logger,
38  fmt::format("{}: StopAsync: primary daq "
39  "failed. Will not stop metadata acquisition.",
40  m_params.status));
41  // Note: Can't throw directly here as we'll unwrap which then
42  // discards that exception -> simply propagate future containing
43  // error instead.
44  return make_exceptional_future<void>(prim_result.get_exception_ptr());
45  }
46  LOG4CPLUS_INFO(m_params.logger,
47  fmt::format("{}: StopAsync: primary daq "
48  "failed. Ignoring this because of "
49  "ErrorPolicy::Tolerant.",
50  m_params.status));
51  }
52  return StopMeta();
53  })
54  .unwrap() // transform future<future<T>> to future<T>
55  .then([this](future<void> res) -> Result<DpParts> {
56  if (res.has_exception()) {
57  m_error = true;
58  if (m_policy == ErrorPolicy::Strict) {
59  LOG4CPLUS_INFO(m_params.logger,
60  fmt::format("{}: StopAsync: stopping failed", m_params.status));
61  (void)res.get(); // throws
62  }
63  LOG4CPLUS_INFO(m_params.logger,
64  fmt::format("{}: StopAsync: meta daq "
65  "failed. Ignoring this because of "
66  "ErrorPolicy::Tolerant.",
67  m_params.status));
68  }
69  return {m_error, std::move(m_parts)};
70  });
71 }
72 
73 boost::future<void> StopAsync::StopMeta() {
74  return SendRequestAndCollectReplies<void>(
75  m_params.meta_sources.begin(),
76  m_params.meta_sources.end(),
77  [](auto&) { return true; },
78  m_params,
79  // Sender
80  [id = m_params.id](Source<MetaSource>& s) {
81  s.SetState(State::Stopping);
82  return s.GetSource().GetRrClient().StopDaq(id);
83  },
84  // reply handler (note that caller must keep this alive until future is set)
85  [this](AsyncOpParams params,
86  Source<MetaSource>& source,
87  boost::future<std::shared_ptr<metadaqif::DaqStopReply>>&& fut) -> void {
88  if (source.GetState() == State::Stopped) {
89  LOG4CPLUS_INFO(params.logger,
90  fmt::format("{}: StopMeta: Source already stopped, ignoring "
91  "reply.",
92  params.status));
93  return;
94  }
95  auto reply = HandleMetaDaqReply("StopDaq",
99  params,
100  source,
101  std::move(fut));
102  if (!reply.has_value()) {
103  return;
104  }
105  // Add files and keywords from reply:
106  for (auto const& file : (**reply).getFiles()) {
107  m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
108  }
109  std::string keywords = (**reply).getKeywords();
110  if (!keywords.empty()) {
111  // Decode
112  auto keyword_vec = fits::ParseJsonKeywords(keywords.c_str());
113  m_parts.emplace_back(std::string(source.GetSource().GetName()),
114  std::move(keyword_vec));
115  }
116  },
117  std::string_view("StopAsync: stop metadata acquisition"))
118  .then(UnwrapVoidReplies);
119 }
120 
121 boost::future<void> StopAsync::StopPrim() {
122  return SendRequestAndCollectReplies<void>(
123  m_params.prim_sources.begin(),
124  m_params.prim_sources.end(),
125  [](Source<PrimSource> const& source) -> bool {
126  /* only send to sources that are not already stopped */
127  return IsSubsequentState(State::Stopped, source.GetState());
128  },
129  m_params,
130  // Sender
131  [](Source<PrimSource>& s) {
132  // id is not supported by recif
133  s.SetState(State::Stopping);
134  return s.GetSource().GetRrClient().RecStop();
135  },
136  // reply handler
137  [this](AsyncOpParams params,
138  Source<PrimSource>& source,
139  boost::future<std::shared_ptr<recif::RecStatus>>&& fut) -> void {
140  auto reply = HandlePrimDaqReply("RecStop",
141  State::Stopping,
142  State::Stopped,
143  State::Stopping,
144  params,
145  source,
146  std::move(fut));
147  if (!reply.has_value()) {
148  return;
149  }
150  // Add files and keywords from reply:
151  for (auto const& file : (**reply).getDpFiles()) {
152  m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
153  }
154  },
155  std::string_view("StopAsync: stop primary data acquisition"))
156  .then(UnwrapVoidReplies);
157 }
158 
159 } // namespace daq::op
Contains data structure for FITS keywords.
Declares daq::State and related functions.
std::optional< ReplyType > HandleMetaDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< MetaSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
Definition: util.cpp:72
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void >>> futures)
Unwrap futures to extract errors.
Definition: util.cpp:60
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
@ Stopping
Transitional state between Acquiring and Stopped.
@ Stopped
All data sources have reported they have stopped acquiring data.
Utility class that represents a result and an error.
Definition: utility.hpp:17
Contains declaration for the StopAsync operation.
Parameters required for each async operation.
rad::IoExecutor & executor
boost::future< Result< DpParts > > Initiate()
Initiates operation that stop metadata acquisition.
Definition: stop.cpp:25
StopAsync(ErrorPolicy policy, AsyncOpParams params) noexcept
Definition: stop.cpp:21
Contains declaration for the async op utilities.