ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
daqController.cpp
Go to the documentation of this file.
1 #include <daq/daqController.hpp>
2 
3 #include <iostream>
4 #include <stdexcept>
5 #include <type_traits>
6 
7 #include <fmt/format.h>
8 #include <fmt/ostream.h>
9 #include <log4cplus/loggingmacros.h>
10 #include <mal/MalException.hpp>
11 #include <mal/rr/qos/ReplyTime.hpp>
12 #include <nlohmann/json.hpp>
13 
14 #include <daq/dpmClient.hpp>
15 #include <daq/error.hpp>
16 #include <daq/fits/json.hpp>
17 #include <daq/op/abort.hpp>
18 #include <daq/op/awaitPrim.hpp>
19 #include <daq/op/initiate.hpp>
20 #include <daq/op/start.hpp>
21 #include <daq/op/stop.hpp>
22 
23 namespace daq {
24 namespace {
25 
26 DaqSources MakeSources(mal::Mal& mal, DaqContext const& ctx) {
27  std::vector<PrimSource> psources;
28  std::vector<MetaSource> msources;
29 
30  psources.reserve(ctx.prim_sources.size());
31  for (auto const& raw : ctx.prim_sources) {
32  psources.emplace_back(
33  raw.name,
34  mal.getClient<daq::PrimSource::RrClient>(
35  mal::Uri(raw.rr_uri),
36  {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(5))},
37  {}));
38  }
39 
40  msources.reserve(ctx.meta_sources.size());
41  for (auto const& raw : ctx.meta_sources) {
42  msources.emplace_back(
43  raw.name,
44  mal.getClient<daq::MetaSource::RrClient>(
45  mal::Uri(raw.rr_uri),
46  {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(5))},
47  {}));
48  }
49 
50  return DaqSources(std::move(psources), std::move(msources));
51 }
52 
53 template <class>
54 inline constexpr bool always_false_v = false; // NOLINT
55 
56 template <class Sources>
57 bool AllInState(Sources sources, State state) {
58  return std::all_of(
59  sources.begin(), sources.end(), [=](auto const& s) -> bool { return s.state == state; });
60 }
61 
62 } // namespace
63 
65  : start([](auto par) { return daq::op::InitiateOperation<daq::op::StartAsync>(par); })
66  , abort([](ErrorPolicy policy, auto par) {
67  return daq::op::InitiateOperation<op::AbortAsync>(policy, std::move(par));
68  })
69  , stop([](ErrorPolicy policy, auto par) {
70  return daq::op::InitiateOperation<op::StopAsync>(policy, std::move(par));
71  })
72  , await_prim([](auto par) {
73  return daq::op::InitiateAbortableOperation<op::AwaitPrimAsync>(std::move(par));
74  }) {
75 }
76 
77 bool OcmAsyncOperations::IsValid() const noexcept {
78  return start && stop && abort && await_prim;
79 }
80 
82  elt::mal::Mal& mal,
83  std::shared_ptr<DpmClient> dpm_client)
84  : m_io_ctx(io_ctx), m_mal(mal), m_async_ops(), m_dpm_client(std::move(dpm_client)) {
85  assert(m_dpm_client);
86 }
87 
89  std::shared_ptr<ObservableStatus> status,
90  std::shared_ptr<ObservableEventLog> event_log)
91  -> std::shared_ptr<DaqController> {
92  DaqSources sources = MakeSources(m_mal, daq_ctx);
93  return OcmDaqController::Create(m_io_ctx,
94  std::move(daq_ctx),
95  sources,
96  std::move(status),
97  std::move(event_log),
98  m_async_ops);
99 }
100 
102  std::shared_ptr<ObservableStatus> status,
103  std::shared_ptr<ObservableEventLog> event_log)
104  -> std::shared_ptr<DaqController> {
105  return std::make_shared<DpmDaqController>(
106  m_io_ctx, std::move(daq_ctx), std::move(status), std::move(event_log), m_dpm_client);
107 }
108 
109 std::ostream& operator<<(std::ostream& os, DaqController const& daq) {
110  os << "DaqController(id='" << daq.GetId() << "', state=" << daq.GetState() << ")";
111  return os;
112 }
113 
114 CommonDaqController::CommonDaqController(boost::asio::io_context& io_context,
115  DaqContext context,
116  std::shared_ptr<ObservableStatus> status,
117  std::shared_ptr<ObservableEventLog> event_log)
118  : m_io_ctx(io_context)
119  , m_executor(m_io_ctx)
120  , m_context(std::move(context))
121  , m_status(std::move(status))
122  , m_event_log(std::move(event_log)) {
123  assert(m_status);
124  assert(m_event_log);
125 }
126 
127 std::shared_ptr<ObservableStatus> CommonDaqController::GetStatus() DAQ_NOEXCEPT {
128  return m_status;
129 }
130 
131 std::shared_ptr<ObservableStatus const> CommonDaqController::GetStatus() const DAQ_NOEXCEPT {
132  return m_status;
133 }
134 
135 std::shared_ptr<ObservableEventLog> CommonDaqController::GetEventLog() DAQ_NOEXCEPT {
136  return m_event_log;
137 }
138 
139 std::string const& CommonDaqController::GetId() const DAQ_NOEXCEPT {
140  return m_status->GetId();
141 }
142 
144  return m_status->GetError();
145 }
146 
148  return m_context;
149 }
150 
151 boost::signals2::connection
152 CommonDaqController::ConnectContext(ContextSignal::slot_type const& slot) {
153  return m_sig_context.connect(slot);
154 }
155 
156 std::shared_ptr<OcmDaqController>
157 OcmDaqController::Create(boost::asio::io_context& io_context,
158  DaqContext context,
159  DaqSources const& sources,
160  std::shared_ptr<ObservableStatus> status,
161  std::shared_ptr<ObservableEventLog> event_log,
162  OcmAsyncOperations ops) {
163  // note: make_shared doesn't work since constructor is protected,
164  // to protect against non-shared ownership.
165  LOG4CPLUS_TRACE("daq", fmt::format("OcmDaqController::Create"));
166  return std::shared_ptr<OcmDaqController>(new OcmDaqController(io_context,
167  std::move(context),
168  sources,
169  std::move(status),
170  std::move(event_log),
171  std::move(ops)));
172 }
173 
174 OcmDaqController::OcmDaqController(boost::asio::io_context& io_context,
175  DaqContext context,
176  DaqSources const& sources,
177  std::shared_ptr<ObservableStatus> status,
178  std::shared_ptr<ObservableEventLog> event_log,
179  OcmAsyncOperations ops)
180  : CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
181 
182  , m_state(MakeState(GetStatusRef().GetState()))
183  , m_prim_sources(MakeSources<PrimSource>(sources.GetPrimarySources()))
184  , m_meta_sources(MakeSources<MetaSource>(sources.GetMetadataSources()))
185  , m_async_ops(std::move(ops))
186  , m_pending_replies(PendingReplies::Create())
187  , m_logger(log4cplus::Logger::getInstance("daq.ocm.controller")) {
188  if (GetContext().id != GetStatusRef().GetId()) {
189  throw boost::enable_current_exception(
190  std::invalid_argument("Data acquisition id mismatch between DaqContext "
191  "and ObservableStatus"));
192  }
193 
194  if (m_prim_sources.empty() && m_meta_sources.empty()) {
195  throw boost::enable_current_exception(std::invalid_argument("No data sources provided"));
196  }
197 
198  if (!m_async_ops.IsValid()) {
199  throw boost::enable_current_exception(
200  std::invalid_argument("OcmAsyncOperations is invalid"));
201  }
202 }
203 
205  State s;
206  std::visit(
207  [&](auto const& var) {
208  using T = std::decay_t<decltype(var)>;
209  if constexpr (std::is_same_v<T, NotStarted>) {
210  s = State::NotStarted;
211  } else if constexpr (std::is_same_v<T, Starting>) {
212  s = State::Starting;
213  } else if constexpr (std::is_same_v<T, Acquiring>) {
214  s = State::Acquiring;
215  } else if constexpr (std::is_same_v<T, Stopping>) {
216  s = State::Stopping;
217  } else if constexpr (std::is_same_v<T, Stopped>) {
218  s = State::Stopped;
219  } else if constexpr (std::is_same_v<T, Aborting>) {
221  } else if constexpr (std::is_same_v<T, Aborted>) {
222  s = State::Aborted;
223  } else {
224  static_assert(always_false_v<T>, "non-exhaustive visitor!");
225  }
226  },
227  m_state);
228  return s;
229 }
230 
231 constexpr log4cplus::Logger const& OcmDaqController::GetLogger() const noexcept {
232  return m_logger;
233 }
234 
236  switch (s) {
237  case State::NotStarted:
238  return StateVariant(NotStarted());
239  case State::Starting:
240  return StateVariant(Starting());
241  case State::Acquiring:
242  return StateVariant(Acquiring());
243  case State::Stopping:
244  return StateVariant(Stopping());
245  case State::Stopped:
246  return StateVariant(Stopped());
248  return StateVariant(Aborting());
249  case State::Aborted:
250  return StateVariant(Aborted());
251  default:
252  break;
253  };
254  LOG4CPLUS_FATAL(m_logger, fmt::format("Invalid state provided: '{}'", s));
255  std::terminate();
256 }
257 
259  GetStatusRef().SetError(error);
260 }
261 
263  m_state = s;
264  // Publish changes
265  GetStatusRef().SetState(GetState());
266 }
267 
270  GetEventLogRef(),
271  alerts,
272  GetIoExecutor(),
273  m_logger,
274  GetId(),
275  *m_pending_replies.get(),
278 }
279 
282  GetEventLogRef(),
283  alerts,
284  GetIoExecutor(),
285  m_logger,
286  GetId(),
287  *m_pending_replies.get(),
290  GetContext().await_interval);
291 }
292 
294  // Nothing yet.
295 }
296 
297 boost::future<State> OcmDaqController::StartAsync() {
299  ActionEvent(GetId(), "DaqController::StartAsync()", GetStatusRef().GetStatus()));
300 
301  // Make sure we're not already started.
302  if (!std::holds_alternative<NotStarted>(m_state)) {
303  return boost::make_exceptional_future<State>(
304  std::runtime_error("Data acquisition is already started"));
305  }
306 
307  SetState(Starting{});
308 
310 
311  auto alerts = std::make_unique<op::AlertState>();
312  return m_async_ops.start(MakeParams(*alerts.get()))
313  .then(GetIoExecutor(),
314  [alerts = std::move(alerts),
315  daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
316  boost::future<void> f) {
317  LOG4CPLUS_INFO(daq->GetLogger(),
318  fmt::format("{}: StartAsync: Completed {}",
319  *daq,
320  f.has_value() ? "successfully" : "with error"));
321  // Merge alerts from async op
322  auto defer = ObservableStatus::DeferSignal(&daq->GetStatusRef());
323  op::MergeAlerts(daq->GetStatusRef(), *alerts);
324 
325  // Re-raise any exception
326  if (f.has_exception()) {
327  daq->SetErrorFlag(true);
328  (void)f.get();
329  }
330  // Await completion if there are primary data sources
331  daq->InitiateAwaitPrimarySources();
332 
333  // No exception -> all done! update and return Acquiring state
334  daq->SetErrorFlag(false);
335  daq->SetState(Acquiring{});
336  return daq->GetState();
337  });
338 }
339 
340 boost::future<Status> OcmDaqController::StopAsync(ErrorPolicy policy) {
342  ActionEvent(GetId(), "DaqController::StopAsync()", GetStatusRef().GetStatus()));
343 
344  if (std::holds_alternative<Stopped>(m_state)) {
345  return boost::make_exceptional_future<Status>(
346  std::runtime_error("Data acquisition already stopped"));
347  }
348 
349  if (std::holds_alternative<Aborted>(m_state)) {
350  return boost::make_exceptional_future<Status>(
351  std::runtime_error("Data acquisition already aborted"));
352  }
353 
354  if (!std::holds_alternative<Acquiring>(m_state)) {
355  return boost::make_exceptional_future<Status>(
356  std::runtime_error("Cannot stop a data acquisition that is not Acquiring"));
357  }
358 
359  // @todo If we're in Starting this will potentially mess up assumption that we're
360  // in Starting. Revisit to remove this assumption?
361  // @todo: Store produced files
362  // m_status.AddFiles(reply.getFiles());
363  SetState(Stopping{});
364 
365  auto alerts = std::make_unique<op::AlertState>();
366  return m_async_ops.stop(policy, MakeParams(*alerts.get()))
367  .then(GetIoExecutor(),
368  [alerts = std::move(alerts),
369  daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
370  boost::future<Result<DpParts>> f) mutable -> Status {
371  if (daq->GetState() != State::Stopping) {
372  // It can happen that a request to stop was superseded and completed before
373  // reply was received. In this case we treat it as a failure as we cannot
374  // determine if post conditions are met.
375  LOG4CPLUS_WARN(
376  daq->GetLogger(),
377  fmt::format("{}: StopAsync: Data acquisition modified by other commands. "
378  "Do nothing else (errors are ignored). ",
379  *daq));
380  throw std::runtime_error(fmt::format(
381  "Stop command could not be completed because Data Acquisitions state was "
382  "modified in the meantime (current state: {})",
383  daq->GetState()));
384  }
385  LOG4CPLUS_INFO(daq->GetLogger(),
386  fmt::format("{}: StopAsync: Completed {}",
387  *daq,
388  f.has_value() ? "successfully" : "with error"));
389  auto defer = ObservableStatus::DeferSignal(&daq->GetStatusRef());
390  op::MergeAlerts(daq->GetStatusRef(), *alerts);
391 
392  if (f.has_exception()) {
393  daq->SetErrorFlag(true);
394  (void)f.get(); // Throw to propagate any error
395  }
396 
397  auto result = f.get();
398  AddDpParts(daq->GetContextMut(), result.result);
399  daq->EmitContextSignal();
400 
401  // If there were no exceptions we're all done.
402  // Update and return Stopped state
403  daq->SetErrorFlag(result.error);
404  daq->SetState(Stopped{});
405  return daq->GetStatus()->GetStatus();
406  });
407 }
408 
409 boost::future<Status> OcmDaqController::AbortAsync(ErrorPolicy policy) {
411  GetId(), fmt::format("DaqController::AbortAsync({})", policy), GetStatusRef().GetStatus()));
412 
413  if (std::holds_alternative<NotStarted>(m_state)) {
414  LOG4CPLUS_INFO(m_logger, fmt::format("{}: Aborting not started data acquisition", *this));
415  SetState(Aborted{});
416  return boost::make_ready_future<Status>(GetStatus()->GetStatus());
417  }
418 
419  if (std::holds_alternative<Stopped>(m_state) || std::holds_alternative<Aborted>(m_state)) {
420  return boost::make_exceptional_future<Status>(
421  std::runtime_error("Data acquisition already stopped or aborted"));
422  }
423 
424  SetState(Aborting{});
425 
426  auto alerts = std::make_unique<op::AlertState>();
427  return m_async_ops.abort(policy, MakeParams(*alerts.get()))
428  .then(GetIoExecutor(),
429  [alerts = std::move(alerts),
430  daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
431  boost::future<Result<void>> f) -> Status {
432  if (daq->GetState() == State::Aborted) {
433  // It can happen that a request to abort was superseded and
434  // finalized before reply was received.
435  LOG4CPLUS_INFO(
436  daq->GetLogger(),
437  fmt::format("{}: AbortAsync: Data acquisition already aborted. "
438  "Do nothing else (errors are ignored). ",
439  *daq));
440  // @todo: Should throw instead as the command was superseeded by
441  // other command.
442  return daq->GetStatus()->GetStatus();
443  }
444  LOG4CPLUS_DEBUG(daq->GetLogger(),
445  fmt::format("{}: AbortAsync: Completed, updating DAQ status and "
446  "set reply remaining. Has fatal error={}",
447  *daq,
448  f.has_exception()));
449  //
450  // Merge alerts from saync op
451  auto defer = ObservableStatus::DeferSignal(&daq->GetStatusRef());
452  op::MergeAlerts(daq->GetStatusRef(), *alerts);
453 
454  if (f.has_exception()) {
455  LOG4CPLUS_ERROR(
456  daq->GetLogger(),
457  fmt::format("{}: AbortAsync: Completed with fatal error.", *daq));
458 
459  daq->SetErrorFlag(true);
460  f.get(); // throw to propagate
461  }
462  auto result = f.get();
463  daq->SetErrorFlag(result.error);
464 
465  // Success
466  daq->SetState(Aborted{});
467  LOG4CPLUS_INFO(daq->GetLogger(),
468  fmt::format("{}: AbortAsync: Completed successfully.", *daq));
469  return daq->GetStatus()->GetStatus();
470  });
471 }
472 
473 boost::future<State> OcmDaqController::ScheduleMergeAsync() {
474  return boost::make_exceptional_future<State>(std::runtime_error(
475  fmt::format("ScheduleMergeAsync() is invalid in state: {}", GetState())));
476 }
477 
480  fmt::format("DaqController::UpdateKeywords(<omitted>)"),
481  GetStatusRef().GetStatus()));
482 
483  if (std::holds_alternative<Stopped>(m_state) || std::holds_alternative<Aborted>(m_state)) {
484  throw boost::enable_current_exception(
485  std::runtime_error("Data acquisition already stopped or aborted"));
486  }
487 
488  daq::UpdateKeywords(GetContextMut(), keywords);
490 }
491 
492 boost::future<State>
493 OcmDaqController::AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout) {
495  GetId(),
496  fmt::format("DaqController::AwaitAsync({}, {} ms)",
497  sources.empty() ? "all primary sources" : "a user defined list of sources",
498  timeout.count()),
499  GetStatusRef().GetStatus()));
500 
501  std::vector<
502  std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
503  await_on;
504 
505  if (!sources.empty()) {
506  for (auto const& source_id : sources) {
507  auto source_var = FindSource(source_id);
508  if (source_var) {
509  await_on.emplace_back(*source_var);
510  } else {
511  return boost::make_exceptional_future<State>(
512  std::invalid_argument(fmt::format("Source with id='{}' not found", source_id)));
513  }
514  }
515  } else {
516  // Use all primary sources by default
517  for (auto& source : m_prim_sources) {
518  await_on.emplace_back(&source);
519  }
520  }
521 
522  // note that condition references sources in OcmDaqController and should not be invoked
523  // unless OcmDaqController is alive.
524  auto condition = [sources = await_on]() {
525  return std::all_of(sources.begin(), sources.end(), [](auto var) {
526  return std::visit(
527  [](auto v) {
528  return v->GetState() == State::Aborted || v->GetState() == State::Stopped;
529  },
530  var);
531  });
532  };
533 
534  // Test if condition is already satified
535  if (condition()) {
536  return boost::make_ready_future<State>(GetState());
537  }
538 
539  // Wait is not already satisfied, attach state listeners to all sources.
540  // daq is captured with a weak ptr to avoid keeping OcmDaqController alive if no state changes
541  // occur on monitored components.
542  auto promise = std::make_shared<boost::promise<State>>();
543 
544  if (timeout > std::chrono::milliseconds(0)) {
545  auto timer = std::make_unique<boost::asio::steady_timer>(GetIoCtx(), timeout);
546  timer->async_wait([promise,
547  timer_ptr = timer.get(),
548  daq_weak = std::weak_ptr<OcmDaqController>(
549  std::static_pointer_cast<OcmDaqController>(shared_from_this()))](
550  boost::system::error_code ec) {
551  // Promise might already be fulfilled, there's no way to check if it is though.
552  try {
553  auto daq = daq_weak.lock();
554  if (ec) {
555  if (daq) {
556  LOG4CPLUS_DEBUG(
557  daq->m_logger,
558  fmt::format("{}: AsyncWait: Operation abandoned before completing.",
559  *daq));
560  }
561  // Timer deleted or was cancelled, set exception in promise
562  promise->set_exception(DaqOperationAborted(""));
563  } else {
564  // Normal timeout
565  // For this case we also want to delete the timer itself
566  if (daq) {
567  LOG4CPLUS_DEBUG(
568  daq->m_logger,
569  fmt::format("{}: AsyncWait: Operation timed out before completing.",
570  *daq));
571  daq->m_timers.erase(
572  std::remove_if(
573  daq->m_timers.begin(),
574  daq->m_timers.end(),
575  [timer_ptr](std::unique_ptr<boost::asio::steady_timer>& val) {
576  return val.get() == timer_ptr;
577  }),
578  daq->m_timers.end());
579  }
580 
581  promise->set_exception(DaqOperationTimeout(""));
582  }
583  } catch (...) {
584  }
585  });
586  m_timers.emplace_back(std::move(timer));
587  }
588 
589  auto listener = [condition,
590  promise,
591  daq_weak = std::weak_ptr<OcmDaqController>(
592  std::static_pointer_cast<OcmDaqController>(shared_from_this()))](State,
593  bool) {
594  auto daq = daq_weak.lock();
595  if (!daq) {
596  LOG4CPLUS_WARN("daq", "OcmDaqController deleted before await condition was fulfulled");
597  // this async op was abandoned. Do nothing.
598  return;
599  }
600 
601  if (condition()) {
602  LOG4CPLUS_INFO(daq->m_logger,
603  fmt::format("{}: AwaitAsync: Await condition fulfilled", *daq));
604  try {
605  promise->set_value(daq->GetState());
606  } catch (...) {
607  // Exception might be thrown because promise is already fulfilled, which is
608  // expected to happen.
609  }
610  }
611  };
612 
613  // Connect listeners to sources that should be awaited.
614  for (auto& source : await_on) {
615  std::visit(
616  [daq = std::static_pointer_cast<OcmDaqController>(shared_from_this()),
617  listener](auto s) {
618  LOG4CPLUS_DEBUG(
619  daq->m_logger,
620  fmt::format("{}: AsyncWait: Attaching listener on source '{}'.", *daq, *s));
621  // Use automatic connection disconnect if daq is destroyed. (i.e. track_foreign).
622  using SlotType = typename std::remove_reference<
623  decltype(*s.get())>::type::StateSignal::slot_type;
624  s->ConnectStateListener(SlotType(std::move(listener)).track_foreign(daq));
625  },
626  source);
627  }
628  return promise->get_future();
629 }
630 
631 std::optional<std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
632 OcmDaqController::FindSource(std::string_view source_id) {
633  {
634  auto it =
635  std::find_if(m_prim_sources.begin(), m_prim_sources.end(), [source_id](auto& source) {
636  return source.GetSource().GetName() == source_id;
637  });
638  if (it != m_prim_sources.end()) {
639  return gsl::not_null<Source<PrimSource>*>(&(*it));
640  }
641  }
642  {
643  auto it =
644  std::find_if(m_meta_sources.begin(), m_meta_sources.end(), [source_id](auto& source) {
645  return source.GetSource().GetName() == source_id;
646  });
647  if (it != m_meta_sources.end()) {
648  return gsl::not_null<Source<MetaSource>*>(&(*it));
649  }
650  }
651 
652  // Not found
653  return {};
654 }
655 
656 template <class SourceType>
657 std::vector<Source<SourceType>> OcmDaqController::MakeSources(std::vector<SourceType> sources) {
658  //@todo: Check for duplicates
659  std::vector<Source<SourceType>> dest;
660  dest.reserve(sources.size());
662  std::make_move_iterator(sources.begin()),
663  std::make_move_iterator(sources.end()),
664  std::back_inserter(dest),
665  [](SourceType&& s) -> Source<SourceType> { return Source<SourceType>{std::move(s)}; });
666  return dest;
667 }
668 
669 void OcmDaqController::InitiateAwaitPrimarySources() {
670  AddEvent<ActionEvent>(GetId(),
671  "DaqController::InitiateAwaitPrimarySources(): Initiating",
672  GetStatusRef().GetStatus());
673  if (m_prim_sources.empty()) {
674  LOG4CPLUS_DEBUG(
675  m_logger,
676  fmt::format("{}: InitiateAwaitPrimarySources: No primary sources to monitor.", *this));
677  return;
678  }
679  LOG4CPLUS_DEBUG(m_logger,
680  fmt::format("{}: InitiateAwaitPrimarySources: Starting operation.", *this));
681  auto alerts = std::make_unique<op::AlertState>();
682  auto [future, abort] = m_async_ops.await_prim(MakeAwaitParams(*alerts.get()));
683  m_abort_await_primary_sources = abort;
684  // Set up continuation that stops data acquisition automatically
685  future.then(
686  GetIoExecutor(),
687  [alerts = std::move(alerts),
688  daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
689  boost::future<Result<DpParts>> fut) {
690  // Add FITS files from the stopped sources
691  auto result = fut.get();
692  LOG4CPLUS_DEBUG(daq->m_logger,
693  fmt::format("{}: InitiateAwaitPrimarySources: Adding {} files from "
694  "primary sources",
695  *daq,
696  result.result.size()));
697  // @todo Confirm if we want to add parts even though state has transitioned past
698  // Acquiring (which may mean manual stop and that parts have already been added)
699  // @todo: How to treat errors from await?
700  daq::AddDpParts(daq->GetContextMut(), result.result);
701  daq->EmitContextSignal();
702 
703  // If daq is still acquiring we stop using strict error policy, otherwise do nothing
704  if (!std::holds_alternative<Acquiring>(daq->m_state)) {
705  LOG4CPLUS_DEBUG(
706  daq->m_logger,
707  fmt::format(
708  "{}: InitiateAwaitPrimarySources: "
709  "AwaitAsync completed but another operation has already transitioned "
710  "DAQ from Acquiring so automatic stop will not be performed.",
711  *daq));
712  return;
713  }
714  daq->AddEvent<ActionEvent>(
715  daq->GetId(),
716  "DaqController::InitiateAwaitPrimarySources(): "
717  "Primary sources completed. Performing automatic stop of metadata sources",
718  daq->GetStatusRef().GetStatus());
719 
720  // Merge alerts from async op
721  auto defer = ObservableStatus::DeferSignal(&daq->GetStatusRef());
722  op::MergeAlerts(daq->GetStatusRef(), *alerts);
723 
724  // No continuation is necessary. If an error occurs that information is published
725  // and user needs to intervene.
726  // If a separate topic is created for errors only it may be published with a
727  // continuation attached to the result of StopAsync (or more likely implemented inside
728  // StopAsync).
729  daq->StopAsync(ErrorPolicy::Strict);
730  });
731 }
732 
733 DpmDaqController::DpmDaqController(boost::asio::io_context& io_context,
734  DaqContext context,
735  std::shared_ptr<ObservableStatus> status,
736  std::shared_ptr<ObservableEventLog> event_log,
737  std::shared_ptr<DpmClient> dpm_client)
738  : CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
739  , m_liveness(std::make_shared<bool>(true))
740  , m_dpm_client(std::move(dpm_client))
741  , m_logger(log4cplus::Logger::getInstance("daq.ocm.controller")) {
742  assert(m_dpm_client);
743  UpdateStateContext();
744 
745  // Connect slot to status signal which mirrors DAQ status of *this* from DPM
746  //
747  // Note: weak_from_this cannot be used in constructor as the reference count is not yet
748  // incremented by the holding shared_ptr. For this reason the m_liveness is used instead.
749  m_status_connection = m_dpm_client->ConnectStatusSignal(
750  [id = GetId(), weak = std::weak_ptr<bool>(m_liveness), this](Status const& status) {
751  if (id != status.id) {
752  // Update for another DAQ
753  return;
754  }
755  auto lock = weak.lock();
756  if (!lock) {
757  // Abandoned
758  LOG4CPLUS_DEBUG("daq", "DpmDaqController is abandoned: " << this);
759  return;
760  }
761  // Assign status of DAQ from DPM.
762  LOG4CPLUS_DEBUG(m_logger, *this << ": Assigning new DAQ status from DPM: " << status);
763  GetStatusRef() = status;
764  UpdateStateContext();
765  });
766 }
767 
768 boost::future<State> DpmDaqController::StartAsync() {
769  return boost::make_exceptional_future<State>(
770  std::runtime_error(fmt::format("StartAsync() is invalid in state: {}", GetState())));
771 }
772 
773 boost::future<Status> DpmDaqController::StopAsync(ErrorPolicy) {
774  return boost::make_exceptional_future<Status>(
775  std::runtime_error(fmt::format("StopAsync() is invalid in state: {}", GetState())));
776 }
777 
778 boost::future<State>
779 DpmDaqController::AwaitAsync(std::vector<std::string>, std::chrono::milliseconds) {
780  return boost::make_exceptional_future<State>(std::runtime_error(
781  fmt::format("AwaitAsync() with sources is invalid in state: {}", GetState())));
782 }
783 
785  throw std::runtime_error(fmt::format("UpdateKeywords() is invalid in state: {}", GetState()));
786 }
787 
789  return GetStatusRef().GetState();
790 }
791 
792 boost::future<State> DpmDaqController::ScheduleMergeAsync() {
793  LOG4CPLUS_TRACE(m_logger, *this << ": DpmDaqController::ScheduleMergeAsync");
794 
795  // If we are in NotScheduled and no request in-flight already: Send request to DPM to merge.
796  // otherwise fail.
797  auto starting_state = GetState();
798  if (starting_state != State::NotScheduled) {
799  return boost::make_exceptional_future<State>(std::runtime_error(
800  fmt::format("ScheduleMergeAsync() is invalid in state: {}", GetState())));
801  }
802 
803  assert(std::holds_alternative<NotScheduled>(m_state_ctx));
804  auto& ctx = std::get<NotScheduled>(m_state_ctx);
805  if (ctx.schedule_reply_pending) {
806  // A request has already been sent
807  return boost::make_exceptional_future<State>(
808  std::logic_error("ScheduleMergeAsync() a request is already in flight"));
809  }
810  if (!m_dp_spec) {
811  auto dp_spec = MakeDataProductSpecification(GetContext(), m_logger);
812  nlohmann::json j;
813  to_json(j, dp_spec);
814  m_dp_spec = j.dump(2); // indent 2
815  LOG4CPLUS_DEBUG(m_logger, "Created DpSpec:\n" << *m_dp_spec);
816  }
817 
818  return m_dpm_client->ScheduleAsync(*m_dp_spec)
819  .then(GetIoExecutor(),
820  [starting_state, weak = weak_from_this(), this](boost::future<State> f) -> State {
821  auto alert_id = MakeAlertId(alert::REQUEST, "dpm.QueueDaq()");
822  auto shared = weak.lock();
823  try {
824  auto state = f.get();
825  if (shared) {
826  auto& status = GetStatusRef();
828  // Update state only if DAQ has not changed state from starting_state.
829  //
830  // This can e.g. happen if OCM receive published state change data sample
831  // from DPM before receiving command reply.
832  if (GetState() == starting_state) {
833  SetState(state);
834  }
835  status.ClearAlert(alert_id);
836  LOG4CPLUS_INFO(m_logger,
837  fmt::format("{}: Scheduled DAQ successfully", *this));
838  } else {
839  LOG4CPLUS_DEBUG("daq", "DpmDaqController is abandoned: " << this);
840  }
841  return state;
842  } catch (elt::mal::TimeoutException const&) {
843  // Timeout errors does not result in error flag being set.
844  if (shared) {
845  LOG4CPLUS_INFO(m_logger,
846  fmt::format("{}: Schedule DAQ failed with timeout "
847  "(no error flag set for this condition)",
848  *this));
849  }
850  throw;
851  } catch (std::exception const& e) {
852  if (shared) {
853  auto& status = GetStatusRef();
855  status.SetAlert(MakeAlert(
856  alert_id,
857  fmt::format("Failed to schedule DAQ for merging: {}", e.what())));
858  status.SetError(true);
859  LOG4CPLUS_ERROR(m_logger, fmt::format("{}: Scheduled DAQ failed", *this));
860  }
861  throw;
862  }
863  });
864 }
865 
866 boost::future<Status> DpmDaqController::AbortAsync(ErrorPolicy policy) {
867  LOG4CPLUS_TRACE(m_logger, *this << ": DpmDaqController::AbortAsync");
868 
869  // 1. If we are sure DAQ has not been scheduled we can abort immediately.
870  auto state = GetState();
871  if (state == State::NotScheduled) {
872  assert(std::holds_alternative<NotScheduled>(m_state_ctx));
873  if (!std::get<NotScheduled>(m_state_ctx).schedule_reply_pending) {
874  SetState(State::Aborted);
875  return boost::make_ready_future<Status>(*GetStatus());
876  }
877  }
878 
879  // 2. Otherwise we must delegate to DPM to abort (this is also why we are not setting state
880  // AbortingMerging as we cannot know until we communiate with DPM).
881  return m_dpm_client->AbortAsync(GetId()).then(
882  GetIoExecutor(), [policy, weak = weak_from_this(), this](boost::future<State> f) -> Status {
883  auto shared = weak.lock();
884  if (!shared) {
885  // Abandonded
886  LOG4CPLUS_DEBUG("daq", "DpmDaqController is abandoned: " << this);
887  throw boost::enable_current_exception(std::runtime_error("Operation aborted"));
888  }
889  try {
890  auto result = f.get();
891  SetState(result);
892 
893  return *GetStatus();
894  } catch (...) {
895  // Even if DPM failed we mark it as aborted if forced
896  if (policy == ErrorPolicy::Tolerant) {
897  LOG4CPLUS_ERROR(m_logger,
898  *this
899  << ": Request to abort DAQ to DPM returned with "
900  "error. ErrorPolicy::Tolerant is used so DAQ is marked "
901  "as aborted anyway");
902  SetState(State::Aborted, true);
903  return *GetStatus();
904  }
905  throw;
906  }
907  });
908 }
909 
910 void DpmDaqController::UpdateStateContext() {
911  auto state = GetState();
912  switch (state) {
913  case State::NotScheduled:
914  m_state_ctx = NotScheduled{false};
915  return;
916  default:
917  m_state_ctx = std::monostate();
918  };
919 }
920 
921 void DpmDaqController::SetState(State state, std::optional<bool> error) {
922  auto prev = GetState();
923  GetStatusRef().SetState(state, error);
924  if (prev != state) {
925  UpdateStateContext();
926  }
927 }
928 
929 } // namespace daq
Contains declaration for the AbortAsync operation.
Contains declaration for the AwaitPrimAsync operation.
Implements common behaviour of OcmDaqController and DpmDaqController.
ObservableEventLog & GetEventLogRef() noexcept
DaqContext const & GetContext() const DAQ_NOEXCEPT override
ObservableStatus & GetStatusRef() noexcept
std::shared_ptr< ObservableStatus > GetStatus() DAQ_NOEXCEPT override
bool GetErrorFlag() const DAQ_NOEXCEPT override
std::shared_ptr< ObservableEventLog > GetEventLog() DAQ_NOEXCEPT override
boost::signals2::connection ConnectContext(ContextSignal::slot_type const &slot) override
Connect observer that is invoked when context is modified.
DaqContext & GetContextMut() noexcept
std::string const & GetId() const DAQ_NOEXCEPT override
rad::IoExecutor & GetIoExecutor() noexcept
CommonDaqController(boost::asio::io_context &io_context, DaqContext context, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log)
auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController > override
Create instance for the DPM phase of the DAQ process.
auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController > override
Create instance for the OCM phase of the DAQ process.
DaqControllerFactoryDefault(boost::asio::io_context &io_ctx, elt::mal::Mal &m_mal, std::shared_ptr< DpmClient > dpm_client)
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
Data acquisition sources.
Definition: source.hpp:184
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Attempts to abort Data Acquisition.
void UpdateKeywords(fits::KeywordVector const &keywords) override
boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout) override
boost::future< State > ScheduleMergeAsync() override
Schedules DAQ for merging by sending request to DPM.
State GetState() const DAQ_NOEXCEPT override
boost::future< Status > StopAsync(ErrorPolicy policy) override
boost::future< State > StartAsync() override
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:140
metadaqif::MetaDaqAsync RrClient
Definition: source.hpp:142
void AddEvent(EventLog::EventType event)
Records that a file has been produced for this data acquisition.
Definition: eventLog.cpp:56
Defer signal changes until later time.
Definition: status.hpp:177
State GetState() const noexcept
Definition: status.cpp:215
void SetState(State s, std::optional< bool > error=std::nullopt) noexcept
Set state of data acquisition.
Definition: status.cpp:223
friend class DeferSignal
Definition: status.hpp:324
boost::future< State > StartAsync() override
Starts the data acquisition.
constexpr log4cplus::Logger const & GetLogger() const noexcept
std::vector< Source< PrimSource > > m_prim_sources
Note: Consider vector immutable!
State GetState() const DAQ_NOEXCEPT override
op::AsyncOpParams MakeParams(op::AlertState &)
Constructs the parameters used for asynchronous operations.
static std::shared_ptr< OcmDaqController > Create(boost::asio::io_context &io_context, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations operations)
Construct object.
StateVariant MakeState(State s) const noexcept
std::optional< std::variant< gsl::not_null< Source< PrimSource > * >, gsl::not_null< Source< MetaSource > * > > > FindSource(std::string_view source_id)
log4cplus::Logger m_logger
void UpdateKeywords(fits::KeywordVector const &keywords) override
Updates (replace or add) list of keywords.
std::shared_ptr< PendingReplies > m_pending_replies
std::variant< NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted > StateVariant
OcmAsyncOperations m_async_ops
std::vector< Source< MetaSource > > m_meta_sources
Note: Consider vector immutable!
boost::future< State > ScheduleMergeAsync() override
Schedules DAQ for merging by sending request to DPM.
OcmDaqController(boost::asio::io_context &io_context, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations ops)
boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout) override
Awaits that data acquisition stops or aborts.
op::AwaitOpParams MakeAwaitParams(op::AlertState &)
void SetErrorFlag(bool error) noexcept
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Aborts the data acquisition.
void SetState(StateVariant &&s) noexcept
boost::future< Status > StopAsync(ErrorPolicy policy) override
Stops the data acquisition.
Simple class that allows you to keep track of how many replies are pending.
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:96
recif::RecCmdsAsync RrClient
Definition: source.hpp:98
#define DAQ_NOEXCEPT
Definition: config.hpp:16
Contains data structure for FITS keywords.
daq::DpmClient
Contains error related declarations for DAQ.
Contains declarations for the helper functions to initiate operations.
constexpr std::string_view REQUEST
Request.
Definition: status.hpp:31
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
void MergeAlerts(ObservableStatus &dest, AlertState &src)
Merge alerts.
daqif::DaqStatus & operator<<(daqif::DaqStatus &status, daq::Status const &rhs)
Convert daq::Status -> daqif::DaqStatus by populating from rhs.
Definition: conversion.cpp:18
AlertId MakeAlertId(std::string_view category, std::string key)
Definition: status.cpp:49
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:63
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
Definition: daqContext.cpp:29
void AddDpParts(DaqContext &ctx, std::vector< DpPart > const &parts)
Definition: daqContext.cpp:36
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
State
Observable states of the data acquisition process.
Definition: state.hpp:39
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Aborted
Data acquisition has been aborted by user.
@ Stopping
Transitional state between Acquiring and Stopped.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ Stopped
All data sources have reported they have stopped acquiring data.
@ Starting
Transitional state between NotStarted and Acquiring when sources have not begun acquiring data yet.
@ AbortingAcquiring
Transitional state for aborting during acquisition.
@ NotStarted
Initial state of data acquisition.
NLOHMANN_JSON_SERIALIZE_ENUM(State, { {State::NotStarted, "NotStarted"}, {State::Starting, "Starting"}, {State::Acquiring, "Acquiring"}, {State::Stopping, "Stopping"}, {State::Stopped, "Stopped"}, {State::NotScheduled, "NotScheduled"}, {State::Scheduled, "Scheduled"}, {State::Transferring, "Transferring"}, {State::Merging, "Merging"}, {State::Releasing, "Releasing"}, {State::AbortingAcquiring, "AbortingAcquiring"}, {State::AbortingMerging, "AbortingMerging"}, {State::Aborted, "Aborted"}, {State::Completed, "Completed"}, }) void to_json(nlohmann void to_json(nlohmann::json &j, Alert const &p)
Definition: json.cpp:48
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Creates a Data Product Specification as serialized JSON from the provided DaqContext.
Definition: makeDpSpec.cpp:266
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:39
Utility class that represents a result and an error.
Definition: utility.hpp:17
Definition: main.cpp:23
Contains declaration for for DaqController.
Contains declaration for the StartAsync operation.
Contains declaration for the StopAsync operation.
Event related to an action being requested or performed.
Definition: eventLog.hpp:56
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
OCM Async operations.
std::function< boost::future< Result< void > >ErrorPolicy, op::AsyncOpParams)> abort
bool IsValid() const noexcept
std::function< boost::future< void >op::AsyncOpParams)> start
std::function< AwaitReturnType(op::AwaitOpParams)> await_prim
OcmAsyncOperations()
Default constructs object with standard async operations.
std::function< boost::future< Result< DpParts > >ErrorPolicy, op::AsyncOpParams)> stop
Simple class that holds the source and associated state.
Definition: source.hpp:29
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:124
Parameters required for each async operation.
Await specific parameters that is not provided with AsyncOpParams.
auto const & transform