ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
start.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_op
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Contains definition for the StartAsync operation
7  */
8 #include <daq/op/start.hpp>
9 
10 #include <Metadaqif.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <log4cplus/loggingmacros.h>
14 #include <boost/assert.hpp>
15 
16 #include <daq/op/util.hpp>
17 #include <daq/state.hpp>
18 
19 namespace {
20 class RecPropertiesImpl : public recif::RecProperties {
21 public:
22  double getAbsTime() const override {
23  return 0.0;
24  }
25  void setAbsTime(double abs_time) override {
26  }
27 
28  std::string getId() const override {
29  return m_id;
30  }
31  void setId(const std::string& id) override {
32  m_id = id;
33  }
34 
35  std::string getInfo() const override {
36  return "";
37  }
38  void setInfo(const std::string& info) override {
39  }
40 
41  std::vector<std::string> getPublishers() const override {
42  return {};
43  }
44  void setPublishers(const std::vector<std::string>& publishers) override {
45  }
46  bool hasKey() const override {
47  return false;
48  }
49  std::unique_ptr<recif::RecProperties> cloneKey() const override {
50  throw std::runtime_error("not clonable");
51  }
52  std::unique_ptr<recif::RecProperties> clone() const override {
53  throw std::runtime_error("not clonable");
54  }
55  bool keyEquals(const recif::RecProperties& other) const override {
56  return false;
57  }
58 
59 private:
60  std::string m_id;
61 };
62 } // namespace
63 
64 namespace daq::op {
65 
66 StartAsync::StartAsync(AsyncOpParams params_arg) noexcept : m_params(params_arg) {
67 }
68 
69 boost::future<void> StartAsync::Initiate() {
70  // Start metadata then primary
71  return StartMeta()
72  .then(m_params.executor,
73  [this](boost::future<void>&& f) -> boost::future<void> {
74  // Simply propagate errors, if any
75  if (f.has_exception()) {
76  LOG4CPLUS_INFO(m_params.logger,
77  fmt::format("{}: StartAsync: StartAsync metadaq "
78  "failed. Will not start primary data acquisition.",
79  m_params.status));
80  // Note: Can't throw directly here as we'll unwrap which then discards that
81  // exception
82  return std::move(f);
83  }
84  // If metadaq was started ok, start primary
85  return StartPrim();
86  })
87  .unwrap(); // unwrap outer future since we actually want the inner one.
88 }
89 
90 boost::future<void> StartAsync::StartMeta() {
91  return SendRequestAndCollectReplies<void>(
92  m_params.meta_sources.begin(),
93  m_params.meta_sources.end(),
94  // predicate
95  [](auto&) { return true; },
96  m_params,
97  // Sender
98  [id = m_params.id](Source<MetaSource>& s) {
99  s.SetState(State::Starting, false);
100  return s.GetSource().GetRrClient().StartDaq(id);
101  },
102  // reply handler:
103  // We transform the return type to void here since there's nothing to keep
104  // in the reply
105  [](AsyncOpParams params,
106  Source<MetaSource>& source,
107  boost::future<std::shared_ptr<metadaqif::DaqReply>>&& fut) -> void {
108  HandleMetaDaqReply<std::shared_ptr<metadaqif::DaqReply>>("StartDaq",
109  State::Starting,
110  State::Acquiring,
111  State::NotStarted,
112  params,
113  source,
114  std::move(fut));
115  },
116  std::string_view("StartDaq() (start metadata acquisition)"))
117  .then(UnwrapVoidReplies); // @note: UnwrapVoidReplies is thread safe
118 }
119 
120 boost::future<void> StartAsync::StartPrim() {
121  return SendRequestAndCollectReplies<void>(
122  m_params.prim_sources.begin(),
123  m_params.prim_sources.end(),
124  [](auto&) { return true; },
125  m_params,
126  // sender
127  [id = m_params.id](Source<PrimSource>& s) {
128  auto& client = s.GetSource().GetRrClient();
129 #if defined(UNIT_TEST)
130  // ECII-783: MAL suddenly requires that interface *must* be implemented by
131  // matching middleware otherwise it fails.
132  // To avoid complicated mocking we still use our local implementation for unit
133  // testing.
134  auto properties = std::make_shared<RecPropertiesImpl>();
135 #else
136 
137  auto mal = client.getMal();
138  BOOST_ASSERT_MSG(mal, "MAL RR client returned invalid MAL pointer");
139  auto properties = mal->createDataEntity<recif::RecProperties>();
140  // Initialize all members by assigning from local implementation
141  *properties = RecPropertiesImpl();
142 #endif
143  properties->setId(id);
144  s.SetState(State::Starting, false);
145  return client.RecStart(properties);
146  },
147  // reply handler
148  [](AsyncOpParams params,
149  Source<PrimSource>& source,
150  boost::future<std::shared_ptr<recif::RecStatus>>&& fut) -> void {
151  HandlePrimDaqReply<std::shared_ptr<recif::RecStatus>>("RecStart",
152  State::Starting,
153  State::Acquiring,
154  State::NotStarted,
155  params,
156  source,
157  std::move(fut));
158  },
159  std::string_view("RecStart() (start primary data acquisition)"))
160  .then(UnwrapVoidReplies); // @note: UnwrapVoidReplies is thread safe
161 }
162 
163 } // namespace daq::op
Declares daq::State and related functions.
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void >>> futures)
Unwrap futures to extract errors.
Definition: util.cpp:60
Contains declaration for the StartAsync operation.
Simple class that holds the source and associated state.
Definition: source.hpp:29
Parameters required for each async operation.
rad::IoExecutor & executor
StartAsync(AsyncOpParams params) noexcept
Definition: start.cpp:66
boost::future< void > Initiate()
Initiates operation that starts metadata acquisition.
Definition: start.cpp:69
Contains declaration for the async op utilities.