ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
start.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_op
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Contains definition for the StartAsync operation
7  */
8 #include <daq/op/start.hpp>
9 
10 #include <Metadaqif.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <log4cplus/loggingmacros.h>
14 
15 #include <daq/op/util.hpp>
16 #include <daq/state.hpp>
17 
18 namespace {
19 class RecPropertiesImpl : public recif::RecProperties {
20 public:
21  double getAbsTime() const override {
22  return 0.0;
23  }
24  void setAbsTime(double abs_time) override {
25  }
26 
27  std::string getId() const override {
28  return m_id;
29  }
30  void setId(const std::string& id) override {
31  m_id = id;
32  }
33 
34  std::string getInfo() const override {
35  return "";
36  }
37  void setInfo(const std::string& info) override {
38  }
39 
40  std::vector<std::string> getPublishers() const override {
41  return {};
42  }
43  void setPublishers(const std::vector<std::string>& publishers) override {
44  }
45  bool hasKey() const override {
46  return false;
47  }
48  std::unique_ptr<recif::RecProperties> cloneKey() const override {
49  throw std::runtime_error("not clonable");
50  }
51  std::unique_ptr<recif::RecProperties> clone() const override {
52  throw std::runtime_error("not clonable");
53  }
54  bool keyEquals(const recif::RecProperties& other) const override {
55  return false;
56  }
57 
58 private:
59  std::string m_id;
60 };
61 } // namespace
62 
63 namespace daq::op {
64 
65 StartAsync::StartAsync(AsyncOpParams params_arg) noexcept : m_params(params_arg) {
66 }
67 
68 boost::future<void> StartAsync::Initiate() {
69  // Start metadata then primary
70  return StartMeta()
71  .then(m_params.executor,
72  [this](boost::future<void>&& f) -> boost::future<void> {
73  // Simply propagate errors, if any
74  if (f.has_exception()) {
75  LOG4CPLUS_INFO(m_params.logger,
76  fmt::format("{}: StartAsync: StartAsync metadaq "
77  "failed. Will not start primary data acquisition.",
78  m_params.status));
79  // Note: Can't throw directly here as we'll unwrap which then discards that
80  // exception
81  return std::move(f);
82  }
83  // If metadaq was started ok, start primary
84  return StartPrim();
85  })
86  .unwrap(); // unwrap outer future since we actually want the inner one.
87 }
88 
89 boost::future<void> StartAsync::StartMeta() {
90  return SendRequestAndCollectReplies<void>(
91  m_params.meta_sources.begin(),
92  m_params.meta_sources.end(),
93  // predicate
94  [](auto&) { return true; },
95  m_params,
96  // Sender
97  [id = m_params.id](Source<MetaSource>& s) {
98  s.SetState(State::Starting, false);
99  return s.GetSource().GetRrClient().StartDaq(id);
100  },
101  // reply handler:
102  // We transform the return type to void here since there's nothing to keep
103  // in the reply
104  [](AsyncOpParams params,
105  Source<MetaSource>& source,
106  boost::future<std::shared_ptr<metadaqif::DaqReply>>&& fut) -> void {
107  HandleMetaDaqReply<std::shared_ptr<metadaqif::DaqReply>>("StartDaq",
108  State::Starting,
109  State::Acquiring,
110  State::NotStarted,
111  params,
112  source,
113  std::move(fut));
114  },
115  std::string_view("start metadata acquisition"))
116  .then(UnwrapVoidReplies); // @note: UnwrapVoidReplies is thread safe
117 }
118 
119 boost::future<void> StartAsync::StartPrim() {
120  return SendRequestAndCollectReplies<void>(
121  m_params.prim_sources.begin(),
122  m_params.prim_sources.end(),
123  [](auto&) { return true; },
124  m_params,
125  // sender
126  [id = m_params.id](Source<PrimSource>& s) {
127  auto properties = std::make_shared<RecPropertiesImpl>();
128  properties->setId(id);
129  s.SetState(State::Starting, false);
130  return s.GetSource().GetRrClient().RecStart(properties);
131  },
132  // reply handler
133  [](AsyncOpParams params,
134  Source<PrimSource>& source,
135  boost::future<std::shared_ptr<recif::RecStatus>>&& fut) -> void {
136  HandlePrimDaqReply<std::shared_ptr<recif::RecStatus>>("RecStart",
137  State::Starting,
138  State::Acquiring,
139  State::NotStarted,
140  params,
141  source,
142  std::move(fut));
143  },
144  std::string_view("start primary data acquisition"))
145  .then(UnwrapVoidReplies); // @note: UnwrapVoidReplies is thread safe
146 }
147 
148 } // namespace daq::op
daq::op::UnwrapVoidReplies
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void >>> futures)
Unwrap futures to extract errors.
Definition: util.cpp:60
start.hpp
Contains declaration for the StartAsync operation.
daq::Source
Simple class that holds the source and associated state.
Definition: source.hpp:29
daq::op
Definition: abort.hpp:19
daq::op::AsyncOpParams::executor
rad::IoExecutor & executor
Definition: asyncOpParams.hpp:100
daq::op::StartAsync::Initiate
boost::future< void > Initiate()
Initiates operation that starts metadata acquisition.
Definition: start.cpp:68
daq::op::StartAsync::StartAsync
StartAsync(AsyncOpParams params) noexcept
Definition: start.cpp:65
state.hpp
Declares daq::State and related functions.
daq::op::AsyncOpParams
Parameters required for each async operation.
Definition: asyncOpParams.hpp:67
util.hpp
Contains declaration for the async op utilities.