ifw-daq  1.0.0
IFW Data Acquisition modules
stop.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_op
4  * @copyright 2021 ESO - European Southern Observatory
5  *
6  * @brief Contains definition for the StopAsync operation
7  */
8 #include <daq/op/stop.hpp>
9 
10 #include <Metadaqif.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <log4cplus/loggingmacros.h>
14 
15 #include <daq/op/util.hpp>
16 #include <daq/state.hpp>
17 #include <daq/fits/json.hpp>
18 
19 
20 namespace daq::op {
21 
22 
24  : m_policy(policy), m_params(m_params), m_error(false) {
25 }
26 
27 [[nodiscard]] boost::future<Result<DpParts>> StopAsync::Initiate() {
28  using boost::future;
29  using boost::make_exceptional_future;
30 
31  // stop primary then metadata.
32  return StopPrim()
33  .then(m_params.executor,
34  [this](future<void> prim_result) -> future<void> {
35  // Simply propagate errors, if any
36  if (prim_result.has_exception()) {
37  m_error = true;
38  if (m_policy == ErrorPolicy::Strict) {
39  LOG4CPLUS_INFO(m_params.logger,
40  fmt::format("{}: StopAsync: primary daq "
41  "failed. Will not stop metadata acquisition.",
42  m_params.status));
43  // Note: Can't throw directly here as we'll unwrap which then
44  // discards that exception -> simply propagate future containing
45  // error instead.
46  return make_exceptional_future<void>(prim_result.get_exception_ptr());
47  }
48  LOG4CPLUS_INFO(m_params.logger,
49  fmt::format("{}: StopAsync: primary daq "
50  "failed. Ignoring this because of "
51  "ErrorPolicy::Tolerant.",
52  m_params.status));
53  }
54  return StopMeta();
55  })
56  .unwrap() // transform future<future<T>> to future<T>
57  .then([this](future<void> res) -> Result<DpParts> {
58  if (res.has_exception()) {
59  m_error = true;
60  if (m_policy == ErrorPolicy::Strict) {
61  LOG4CPLUS_INFO(m_params.logger,
62  fmt::format("{}: StopAsync: stopping failed", m_params.status));
63  (void)res.get(); // throws
64  }
65  LOG4CPLUS_INFO(m_params.logger,
66  fmt::format("{}: StopAsync: meta daq "
67  "failed. Ignoring this because of "
68  "ErrorPolicy::Tolerant.",
69  m_params.status));
70  }
71  return {m_error, std::move(m_parts)};
72  });
73 }
74 
75 boost::future<void> StopAsync::StopMeta() {
76  return SendRequestAndCollectReplies<void>(
77  m_params.meta_sources.begin(),
78  m_params.meta_sources.end(),
79  [](auto&) { return true; },
80  m_params,
81  // Sender
82  [id = m_params.id](Source<MetaSource>& s) {
83  s.SetState(State::Stopping);
84  return s.GetSource().GetRrClient().StopDaq(id);
85  },
86  // reply handler (note that caller must keep this alive until future is set)
87  [this](AsyncOpParams params,
88  Source<MetaSource>& source,
89  boost::future<std::shared_ptr<metadaqif::DaqStopReply>>&& fut) -> void {
90  if (source.GetState() == State::Stopped) {
91  LOG4CPLUS_INFO(params.logger,
92  fmt::format("{}: StopMeta: Source already stopped, ignoring "
93  "reply.",
94  params.status));
95  return;
96  }
97  auto reply = HandleMetaDaqReply(State::Stopping,
98  State::Stopped,
99  State::Stopping,
100  params,
101  source,
102  std::move(fut));
103  if (!reply.has_value()) {
104  return;
105  }
106  // Add files and keywords from reply:
107  for (auto const& file : (**reply).getFiles()) {
108  m_parts.emplace_back(std::string(source.GetSource().GetName()),
109  file);
110  }
111  std::string keywords = (**reply).getKeywords();
112  if (!keywords.empty()) {
113  // Decode
114  auto keyword_vec = fits::ParseJsonKeywords(keywords.c_str());
115  m_parts.emplace_back(std::string(source.GetSource().GetName()),
116  std::move(keyword_vec));
117  }
118  },
119  std::string_view("StopAsync: stop metadata acquisition"))
120  .then(UnwrapVoidReplies);
121 }
122 
123 boost::future<void> StopAsync::StopPrim() {
124  return SendRequestAndCollectReplies<void>(
125  m_params.prim_sources.begin(),
126  m_params.prim_sources.end(),
127  [](Source<PrimSource> const& source) -> bool {
128  /* only send to sources that are not already stopped */
129  return !IsFinalState(source.GetState());
130  },
131  m_params,
132  // Sender
133  [](Source<PrimSource>& s) {
134  // id is not supported by recif
135  s.SetState(State::Stopping);
136  return s.GetSource().GetRrClient().RecStop();
137  },
138  // reply handler
139  [this](AsyncOpParams params,
140  Source<PrimSource>& source,
141  boost::future<std::shared_ptr<recif::RecStatus>>&& fut) -> void {
142  auto reply = HandlePrimDaqReply(State::Stopping,
143  State::Stopped,
144  State::Stopping,
145  params,
146  source,
147  std::move(fut));
148  if (!reply.has_value()) {
149  return;
150  }
151  // Add files and keywords from reply:
152  for (auto const& file : (**reply).getDpFiles()) {
153  m_parts.emplace_back(std::string(source.GetSource().GetName()),
154  file);
155  }
156  },
157  std::string_view("StopAsync: stop primary data acquisition"))
158  .then(UnwrapVoidReplies);
159 }
160 
161 } // namespace daq::op
stop.hpp
Contains declaration for the StopAsync operation.
daq::op::UnwrapVoidReplies
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void >>> &&futures)
Unwrap futures to extract errors.
Definition: util.cpp:54
daq::op
Definition: abort.hpp:19
daq::op::AsyncOpParams::executor
rad::IoExecutor & executor
Definition: asyncOpParams.hpp:50
daq::op::StopAsync::Initiate
boost::future< Result< DpParts > > Initiate()
Initiates operation that stop metadata acquisition.
Definition: stop.cpp:27
daq::op::HandleMetaDaqReply
std::optional< ReplyType > HandleMetaDaqReply(std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< MetaSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
Definition: util.cpp:65
daq::Result
Utility class that represents a result and an error.
Definition: utility.hpp:17
json.hpp
Contains data structure for FITS keywords.
daq::op::StopAsync::StopAsync
StopAsync(ErrorPolicy policy, AsyncOpParams params) noexcept
Definition: stop.cpp:23
daq::op::AsyncOpParams
Parameters required for each async operation.
Definition: asyncOpParams.hpp:23
state.hpp
Declares daq::State and related functions.
daq::ErrorPolicy
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
util.hpp
Contains declaration for the async op utilities.