ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
daqController.cpp
Go to the documentation of this file.
1 #include <daq/daqController.hpp>
2 
3 #include <iostream>
4 #include <stdexcept>
5 #include <type_traits>
6 
7 #include <fmt/format.h>
8 #include <fmt/ostream.h>
9 #include <log4cplus/loggingmacros.h>
10 #include <mal/MalException.hpp>
11 #include <mal/rr/qos/ReplyTime.hpp>
12 #include <nlohmann/json.hpp>
13 
14 #include <daq/dpmClient.hpp>
15 #include <daq/error.hpp>
16 #include <daq/fits/json.hpp>
17 #include <daq/op/abort.hpp>
18 #include <daq/op/awaitPrim.hpp>
19 #include <daq/op/initiate.hpp>
20 #include <daq/op/start.hpp>
21 #include <daq/op/stop.hpp>
22 
23 namespace daq {
24 namespace {
25 
26 DaqSources MakeSources(mal::Mal& mal, DaqContext const& ctx) {
27  std::vector<PrimSource> psources;
28  std::vector<MetaSource> msources;
29 
30  psources.reserve(ctx.prim_sources.size());
31  for (auto const& raw : ctx.prim_sources) {
32  psources.emplace_back(
33  raw.name,
34  mal.getClient<daq::PrimSource::RrClient>(
35  mal::Uri(raw.rr_uri),
36  {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(5))},
37  {}));
38  }
39 
40  msources.reserve(ctx.meta_sources.size());
41  for (auto const& raw : ctx.meta_sources) {
42  msources.emplace_back(
43  raw.name,
44  mal.getClient<daq::MetaSource::RrClient>(
45  mal::Uri(raw.rr_uri),
46  {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(5))},
47  {}));
48  }
49 
50  return DaqSources(std::move(psources), std::move(msources));
51 }
52 
53 template <class>
54 inline constexpr bool always_false_v = false; // NOLINT
55 
56 template <class Sources>
57 bool AllInState(Sources sources, State state) {
58  return std::all_of(
59  sources.begin(), sources.end(), [=](auto const& s) -> bool { return s.state == state; });
60 }
61 
62 } // namespace
63 
65  : start([](auto par) { return daq::op::InitiateOperation<daq::op::StartAsync>(par); })
66  , abort([](ErrorPolicy policy, auto par) {
67  return daq::op::InitiateOperation<op::AbortAsync>(policy, std::move(par));
68  })
69  , stop([](ErrorPolicy policy, auto par) {
70  return daq::op::InitiateOperation<op::StopAsync>(policy, std::move(par));
71  })
72  , await_prim([](auto par) {
73  return daq::op::InitiateAbortableOperation<op::AwaitPrimAsync>(std::move(par));
74  }) {
75 }
76 
77 bool OcmAsyncOperations::IsValid() const noexcept {
78  return start && stop && abort && await_prim;
79 }
80 
82  elt::mal::Mal& mal,
83  std::shared_ptr<DpmClient> dpm_client)
84  : m_io_ctx(io_ctx), m_mal(mal), m_async_ops(), m_dpm_client(std::move(dpm_client)) {
85  assert(m_dpm_client);
86 }
87 
89  std::shared_ptr<ObservableStatus> status,
90  std::shared_ptr<ObservableEventLog> event_log)
91  -> std::shared_ptr<DaqController> {
92  DaqSources sources = MakeSources(m_mal, daq_ctx);
93  return OcmDaqController::Create(m_io_ctx,
94  std::move(daq_ctx),
95  sources,
96  std::move(status),
97  std::move(event_log),
98  m_async_ops);
99 }
100 
102  std::shared_ptr<ObservableStatus> status,
103  std::shared_ptr<ObservableEventLog> event_log)
104  -> std::shared_ptr<DaqController> {
105  return std::make_shared<DpmDaqController>(
106  m_io_ctx, std::move(daq_ctx), std::move(status), std::move(event_log), m_dpm_client);
107 }
108 
109 std::ostream& operator<<(std::ostream& os, DaqController const& daq) {
110  os << "DaqController(id='" << daq.GetId() << "', state=" << daq.GetState() << ")";
111  return os;
112 }
113 
114 CommonDaqController::CommonDaqController(boost::asio::io_context& io_context,
115  DaqContext context,
116  std::shared_ptr<ObservableStatus> status,
117  std::shared_ptr<ObservableEventLog> event_log)
118  : m_io_ctx(io_context)
119  , m_executor(m_io_ctx)
120  , m_context(std::move(context))
121  , m_status(std::move(status))
122  , m_event_log(std::move(event_log)) {
123  assert(m_status);
124  assert(m_event_log);
125 }
126 
127 std::shared_ptr<ObservableStatus> CommonDaqController::GetStatus() DAQ_NOEXCEPT {
128  return m_status;
129 }
130 
131 std::shared_ptr<ObservableStatus const> CommonDaqController::GetStatus() const DAQ_NOEXCEPT {
132  return m_status;
133 }
134 
135 std::shared_ptr<ObservableEventLog> CommonDaqController::GetEventLog() DAQ_NOEXCEPT {
136  return m_event_log;
137 }
138 
139 std::string const& CommonDaqController::GetId() const DAQ_NOEXCEPT {
140  return m_status->GetId();
141 }
142 
144  return m_status->GetError();
145 }
146 
148  return m_context;
149 }
150 
151 boost::signals2::connection
152 CommonDaqController::ConnectContext(ContextSignal::slot_type const& slot) {
153  return m_sig_context.connect(slot);
154 }
155 
156 std::shared_ptr<OcmDaqController>
157 OcmDaqController::Create(boost::asio::io_context& io_context,
158  DaqContext context,
159  DaqSources const& sources,
160  std::shared_ptr<ObservableStatus> status,
161  std::shared_ptr<ObservableEventLog> event_log,
162  OcmAsyncOperations ops) {
163  // note: make_shared doesn't work since constructor is protected,
164  // to protect against non-shared ownership.
165  LOG4CPLUS_TRACE("daq", fmt::format("OcmDaqController::Create"));
166  return std::shared_ptr<OcmDaqController>(new OcmDaqController(io_context,
167  std::move(context),
168  sources,
169  std::move(status),
170  std::move(event_log),
171  std::move(ops)));
172 }
173 
174 OcmDaqController::OcmDaqController(boost::asio::io_context& io_context,
175  DaqContext context,
176  DaqSources const& sources,
177  std::shared_ptr<ObservableStatus> status,
178  std::shared_ptr<ObservableEventLog> event_log,
179  OcmAsyncOperations ops)
180  : CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
181 
182  , m_state(MakeState(GetStatusRef().GetState()))
183  , m_prim_sources(MakeSources<PrimSource>(sources.GetPrimarySources()))
184  , m_meta_sources(MakeSources<MetaSource>(sources.GetMetadataSources()))
185  , m_async_ops(std::move(ops))
186  , m_pending_replies(PendingReplies::Create())
187  , m_logger(log4cplus::Logger::getInstance("daq")) {
188  if (GetContext().id != GetStatusRef().GetId()) {
189  throw boost::enable_current_exception(
190  std::invalid_argument("Data acquisition id mismatch between DaqContext "
191  "and ObservableStatus"));
192  }
193 
194  if (m_prim_sources.empty() && m_meta_sources.empty()) {
195  throw boost::enable_current_exception(std::invalid_argument("No data sources provided"));
196  }
197 
198  if (!m_async_ops.IsValid()) {
199  throw boost::enable_current_exception(
200  std::invalid_argument("OcmAsyncOperations is invalid"));
201  }
202 }
203 
205  State s;
206  std::visit(
207  [&](auto const& var) {
208  using T = std::decay_t<decltype(var)>;
209  if constexpr (std::is_same_v<T, NotStarted>) {
210  s = State::NotStarted;
211  } else if constexpr (std::is_same_v<T, Starting>) {
212  s = State::Starting;
213  } else if constexpr (std::is_same_v<T, Acquiring>) {
214  s = State::Acquiring;
215  } else if constexpr (std::is_same_v<T, Stopping>) {
216  s = State::Stopping;
217  } else if constexpr (std::is_same_v<T, Stopped>) {
218  s = State::Stopped;
219  } else if constexpr (std::is_same_v<T, Aborting>) {
220  s = State::AbortingAcquiring;
221  } else if constexpr (std::is_same_v<T, Aborted>) {
222  s = State::Aborted;
223  } else {
224  static_assert(always_false_v<T>, "non-exhaustive visitor!");
225  }
226  },
227  m_state);
228  return s;
229 }
230 
231 constexpr log4cplus::Logger const& OcmDaqController::GetLogger() const noexcept {
232  return m_logger;
233 }
234 
236  switch (s) {
237  case State::NotStarted:
238  return StateVariant(NotStarted());
239  case State::Starting:
240  return StateVariant(Starting());
241  case State::Acquiring:
242  return StateVariant(Acquiring());
243  case State::Stopping:
244  return StateVariant(Stopping());
245  case State::Stopped:
246  return StateVariant(Stopped());
247  case State::AbortingAcquiring:
248  return StateVariant(Aborting());
249  case State::Aborted:
250  return StateVariant(Aborted());
251  default:
252  break;
253  };
254  LOG4CPLUS_FATAL(m_logger, fmt::format("Invalid state provided: '{}'", s));
255  std::terminate();
256 }
257 
259  GetStatusRef().SetError(error);
260 }
261 
263  m_state = s;
264  // Publish changes
265  GetStatusRef().SetState(GetState());
266 }
267 
270  GetEventLogRef(),
271  alerts,
272  GetIoExecutor(),
273  m_logger,
274  GetId(),
275  *m_pending_replies.get(),
278 }
279 
282  GetEventLogRef(),
283  alerts,
284  GetIoExecutor(),
285  m_logger,
286  GetId(),
287  *m_pending_replies.get(),
290  GetContext().await_interval);
291 }
292 
294  // Nothing yet.
295 }
296 
297 boost::future<State> OcmDaqController::StartAsync() {
299  ActionEvent(GetId(), "DaqController::StartAsync()", GetStatusRef().GetStatus()));
300 
301  // Make sure we're not already started.
302  if (!std::holds_alternative<NotStarted>(m_state)) {
303  return boost::make_exceptional_future<State>(
304  std::runtime_error("Data acquisition is already started"));
305  }
306 
307  SetState(Starting{});
308 
310 
311  auto alerts = std::make_unique<op::AlertState>();
312  return m_async_ops.start(MakeParams(*alerts.get()))
313  .then(GetIoExecutor(),
314  [alerts = std::move(alerts),
315  daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
316  boost::future<void> f) {
317  LOG4CPLUS_INFO(daq->GetLogger(),
318  fmt::format("{}: StartAsync: Completed {}",
319  *daq,
320  f.has_value() ? "successfully" : "with error"));
321  // Merge alerts from async op
322  auto defer = ObservableStatus::DeferSignal(&daq->GetStatusRef());
323  op::MergeAlerts(daq->GetStatusRef(), *alerts);
324 
325  // Re-raise any exception
326  if (f.has_exception()) {
327  daq->SetErrorFlag(true);
328  (void)f.get();
329  }
330  // Await completion if there are primary data sources
331  daq->InitiateAwaitPrimarySources();
332 
333  // No exception -> all done! update and return Acquiring state
334  daq->SetErrorFlag(false);
335  daq->SetState(Acquiring{});
336  return daq->GetState();
337  });
338 }
339 
340 boost::future<Status> OcmDaqController::StopAsync(ErrorPolicy policy) {
342  ActionEvent(GetId(), "DaqController::StopAsync()", GetStatusRef().GetStatus()));
343 
344  if (std::holds_alternative<Stopped>(m_state)) {
345  return boost::make_exceptional_future<Status>(
346  std::runtime_error("Data acquisition already stopped"));
347  }
348 
349  if (std::holds_alternative<Aborted>(m_state)) {
350  return boost::make_exceptional_future<Status>(
351  std::runtime_error("Data acquisition already aborted"));
352  }
353 
354  if (!std::holds_alternative<Acquiring>(m_state)) {
355  return boost::make_exceptional_future<Status>(
356  std::runtime_error("Cannot stop a data acquisition that is not Acquiring"));
357  }
358 
359  // @todo If we're in Starting this will potentially mess up assumption that we're
360  // in Starting. Revisit to remove this assumption?
361  // @todo: Store produced files
362  // m_status.AddFiles(reply.getFiles());
363  SetState(Stopping{});
364 
365  auto alerts = std::make_unique<op::AlertState>();
366  return m_async_ops.stop(policy, MakeParams(*alerts.get()))
367  .then(GetIoExecutor(),
368  [alerts = std::move(alerts),
369  daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
370  boost::future<Result<DpParts>> f) mutable -> Status {
371  if (daq->GetState() != State::Stopping) {
372  // It can happen that a request to stop was superseded and completed before
373  // reply was received. In this case we treat it as a failure as we cannot
374  // determine if post conditions are met.
375  LOG4CPLUS_WARN(
376  daq->GetLogger(),
377  fmt::format("{}: StopAsync: Data acquisition modified by other commands. "
378  "Do nothing else (errors are ignored). ",
379  *daq));
380  throw std::runtime_error(fmt::format(
381  "Stop command could not be completed because Data Acquisitions state was "
382  "modified in the meantime (current state: {})",
383  daq->GetState()));
384  }
385  LOG4CPLUS_INFO(daq->GetLogger(),
386  fmt::format("{}: StopAsync: Completed {}",
387  *daq,
388  f.has_value() ? "successfully" : "with error"));
389  auto defer = ObservableStatus::DeferSignal(&daq->GetStatusRef());
390  op::MergeAlerts(daq->GetStatusRef(), *alerts);
391 
392  if (f.has_exception()) {
393  daq->SetErrorFlag(true);
394  (void)f.get(); // Throw to propagate any error
395  }
396 
397  auto result = f.get();
398  AddDpParts(daq->GetContextMut(), result.result);
399  daq->EmitContextSignal();
400 
401  // If there were no exceptions we're all done.
402  // Update and return Stopped state
403  daq->SetErrorFlag(result.error);
404  daq->SetState(Stopped{});
405  return daq->GetStatus()->GetStatus();
406  });
407 }
408 
409 boost::future<Status> OcmDaqController::AbortAsync(ErrorPolicy policy) {
411  GetId(), fmt::format("DaqController::AbortAsync({})", policy), GetStatusRef().GetStatus()));
412 
413  if (std::holds_alternative<NotStarted>(m_state)) {
414  LOG4CPLUS_INFO(m_logger, fmt::format("{}: Aborting not started data acquisition", *this));
415  SetState(Aborted{});
416  return boost::make_ready_future<Status>(GetStatus()->GetStatus());
417  }
418 
419  if (std::holds_alternative<Stopped>(m_state) || std::holds_alternative<Aborted>(m_state)) {
420  return boost::make_exceptional_future<Status>(
421  std::runtime_error("Data acquisition already stopped or aborted"));
422  }
423 
424  SetState(Aborting{});
425 
426  auto alerts = std::make_unique<op::AlertState>();
427  return m_async_ops.abort(policy, MakeParams(*alerts.get()))
428  .then(GetIoExecutor(),
429  [alerts = std::move(alerts),
430  daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
431  boost::future<Result<void>> f) -> Status {
432  if (daq->GetState() == State::Aborted) {
433  // It can happen that a request to abort was superseded and
434  // finalized before reply was received.
435  LOG4CPLUS_INFO(
436  daq->GetLogger(),
437  fmt::format("{}: AbortAsync: Data acquisition already aborted. "
438  "Do nothing else (errors are ignored). ",
439  *daq));
440  // @todo: Should throw instead as the command was superseeded by
441  // other command.
442  return daq->GetStatus()->GetStatus();
443  }
444  LOG4CPLUS_DEBUG(daq->GetLogger(),
445  fmt::format("{}: AbortAsync: Completed, updating DAQ status and "
446  "set reply remaining. Has fatal error={}",
447  *daq,
448  f.has_exception()));
449  //
450  // Merge alerts from saync op
451  auto defer = ObservableStatus::DeferSignal(&daq->GetStatusRef());
452  op::MergeAlerts(daq->GetStatusRef(), *alerts);
453 
454  if (f.has_exception()) {
455  LOG4CPLUS_ERROR(
456  daq->GetLogger(),
457  fmt::format("{}: AbortAsync: Completed with fatal error.", *daq));
458 
459  daq->SetErrorFlag(true);
460  f.get(); // throw to propagate
461  }
462  auto result = f.get();
463  daq->SetErrorFlag(result.error);
464 
465  // Success
466  daq->SetState(Aborted{});
467  LOG4CPLUS_INFO(daq->GetLogger(),
468  fmt::format("{}: AbortAsync: Completed successfully.", *daq));
469  return daq->GetStatus()->GetStatus();
470  });
471 }
472 
473 boost::future<State> OcmDaqController::ScheduleMergeAsync() {
474  return boost::make_exceptional_future<State>(std::runtime_error(
475  fmt::format("ScheduleMergeAsync() is invalid in state: {}", GetState())));
476 }
477 
480  fmt::format("DaqController::UpdateKeywords(<omitted>)"),
481  GetStatusRef().GetStatus()));
482 
483  if (std::holds_alternative<Stopped>(m_state) || std::holds_alternative<Aborted>(m_state)) {
484  throw boost::enable_current_exception(
485  std::runtime_error("Data acquisition already stopped or aborted"));
486  }
487 
488  daq::UpdateKeywords(GetContextMut(), keywords);
490 }
491 
492 boost::future<State>
493 OcmDaqController::AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout) {
495  GetId(),
496  fmt::format("DaqController::AwaitAsync({}, {} ms)",
497  sources.empty() ? "all primary sources" : "a user defined list of sources",
498  timeout.count()),
499  GetStatusRef().GetStatus()));
500 
501  std::vector<
502  std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
503  await_on;
504 
505  if (!sources.empty()) {
506  for (auto const& source_id : sources) {
507  auto source_var = FindSource(source_id);
508  if (source_var) {
509  await_on.emplace_back(*source_var);
510  } else {
511  return boost::make_exceptional_future<State>(
512  std::invalid_argument(fmt::format("Source with id='{}' not found", source_id)));
513  }
514  }
515  } else {
516  // Use all primary sources by default
517  for (auto& source : m_prim_sources) {
518  await_on.emplace_back(&source);
519  }
520  }
521 
522  // note that condition references sources in OcmDaqController and should not be invoked
523  // unless OcmDaqController is alive.
524  auto condition = [sources = await_on]() {
525  return std::all_of(sources.begin(), sources.end(), [](auto var) {
526  return std::visit(
527  [](auto v) {
528  return v->GetState() == State::Aborted || v->GetState() == State::Stopped;
529  },
530  var);
531  });
532  };
533 
534  // Test if condition is already satified
535  if (condition()) {
536  return boost::make_ready_future<State>(GetState());
537  }
538 
539  // Wait is not already satisfied, attach state listeners to all sources.
540  // daq is captured with a weak ptr to avoid keeping OcmDaqController alive if no state changes
541  // occur on monitored components.
542  auto promise = std::make_shared<boost::promise<State>>();
543 
544  if (timeout > std::chrono::milliseconds(0)) {
545  auto timer = std::make_unique<boost::asio::steady_timer>(GetIoCtx(), timeout);
546  timer->async_wait([promise,
547  timer_ptr = timer.get(),
548  daq_weak = std::weak_ptr<OcmDaqController>(
549  std::static_pointer_cast<OcmDaqController>(shared_from_this()))](
550  boost::system::error_code ec) {
551  // Promise might already be fulfilled, there's no way to check if it is though.
552  try {
553  auto daq = daq_weak.lock();
554  if (ec) {
555  if (daq) {
556  LOG4CPLUS_DEBUG(
557  daq->m_logger,
558  fmt::format("{}: AsyncWait: Operation abandoned before completing.",
559  *daq));
560  }
561  // Timer deleted or was cancelled, set exception in promise
562  promise->set_exception(DaqOperationAborted(""));
563  } else {
564  // Normal timeout
565  // For this case we also want to delete the timer itself
566  if (daq) {
567  LOG4CPLUS_DEBUG(
568  daq->m_logger,
569  fmt::format("{}: AsyncWait: Operation timed out before completing.",
570  *daq));
571  daq->m_timers.erase(
572  std::remove_if(
573  daq->m_timers.begin(),
574  daq->m_timers.end(),
575  [timer_ptr](std::unique_ptr<boost::asio::steady_timer>& val) {
576  return val.get() == timer_ptr;
577  }),
578  daq->m_timers.end());
579  }
580 
581  promise->set_exception(DaqOperationTimeout(""));
582  }
583  } catch (...) {
584  }
585  });
586  m_timers.emplace_back(std::move(timer));
587  }
588 
589  auto listener = [condition,
590  promise,
591  daq_weak = std::weak_ptr<OcmDaqController>(
592  std::static_pointer_cast<OcmDaqController>(shared_from_this()))](State,
593  bool) {
594  auto daq = daq_weak.lock();
595  if (!daq) {
596  LOG4CPLUS_WARN("daq", "OcmDaqController deleted before await condition was fulfulled");
597  // this async op was abandoned. Do nothing.
598  return;
599  }
600 
601  if (condition()) {
602  LOG4CPLUS_INFO(daq->m_logger,
603  fmt::format("{}: AwaitAsync: Await condition fulfilled", *daq));
604  try {
605  promise->set_value(daq->GetState());
606  } catch (...) {
607  // Exception might be thrown because promise is already fulfilled, which is
608  // expected to happen.
609  }
610  }
611  };
612 
613  // Connect listeners to sources that should be awaited.
614  for (auto& source : await_on) {
615  std::visit(
616  [daq = std::static_pointer_cast<OcmDaqController>(shared_from_this()),
617  listener](auto s) {
618  LOG4CPLUS_DEBUG(
619  daq->m_logger,
620  fmt::format("{}: AsyncWait: Attaching listener on source '{}'.", *daq, *s));
621  // Use automatic connection disconnect if daq is destroyed. (i.e. track_foreign).
622  using SlotType = typename std::remove_reference<
623  decltype(*s.get())>::type::StateSignal::slot_type;
624  s->ConnectStateListener(SlotType(std::move(listener)).track_foreign(daq));
625  },
626  source);
627  }
628  return promise->get_future();
629 }
630 
631 std::optional<std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
632 OcmDaqController::FindSource(std::string_view source_id) {
633  {
634  auto it =
635  std::find_if(m_prim_sources.begin(), m_prim_sources.end(), [source_id](auto& source) {
636  return source.GetSource().GetName() == source_id;
637  });
638  if (it != m_prim_sources.end()) {
639  return gsl::not_null<Source<PrimSource>*>(&(*it));
640  }
641  }
642  {
643  auto it =
644  std::find_if(m_meta_sources.begin(), m_meta_sources.end(), [source_id](auto& source) {
645  return source.GetSource().GetName() == source_id;
646  });
647  if (it != m_meta_sources.end()) {
648  return gsl::not_null<Source<MetaSource>*>(&(*it));
649  }
650  }
651 
652  // Not found
653  return {};
654 }
655 
656 template <class SourceType>
657 std::vector<Source<SourceType>> OcmDaqController::MakeSources(std::vector<SourceType> sources) {
658  //@todo: Check for duplicates
659  std::vector<Source<SourceType>> dest;
660  dest.reserve(sources.size());
661  std::transform(
662  std::make_move_iterator(sources.begin()),
663  std::make_move_iterator(sources.end()),
664  std::back_inserter(dest),
665  [](SourceType&& s) -> Source<SourceType> { return Source<SourceType>{std::move(s)}; });
666  return dest;
667 }
668 
669 void OcmDaqController::InitiateAwaitPrimarySources() {
670  AddEvent<ActionEvent>(GetId(),
671  "DaqController::InitiateAwaitPrimarySources(): Initiating",
672  GetStatusRef().GetStatus());
673  if (m_prim_sources.empty()) {
674  LOG4CPLUS_DEBUG(
675  m_logger,
676  fmt::format("{}: InitiateAwaitPrimarySources: No primary sources to monitor.", *this));
677  return;
678  }
679  LOG4CPLUS_DEBUG(m_logger,
680  fmt::format("{}: InitiateAwaitPrimarySources: Starting operation.", *this));
681  auto alerts = std::make_unique<op::AlertState>();
682  auto [future, abort] = m_async_ops.await_prim(MakeAwaitParams(*alerts.get()));
683  m_abort_await_primary_sources = abort;
684  // Set up continuation that stops data acquisition automatically
685  future.then(
686  GetIoExecutor(),
687  [alerts = std::move(alerts),
688  daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
689  boost::future<Result<DpParts>> fut) {
690  // Add FITS files from the stopped sources
691  auto result = fut.get();
692  LOG4CPLUS_DEBUG(daq->m_logger,
693  fmt::format("{}: InitiateAwaitPrimarySources: Adding {} files from "
694  "primary sources",
695  *daq,
696  result.result.size()));
697  // @todo Confirm if we want to add parts even though state has transitioned past
698  // Acquiring (which may mean manual stop and that parts have already been added)
699  // @todo: How to treat errors from await?
700  daq::AddDpParts(daq->GetContextMut(), result.result);
701  daq->EmitContextSignal();
702 
703  // If daq is still acquiring we stop using strict error policy, otherwise do nothing
704  if (!std::holds_alternative<Acquiring>(daq->m_state)) {
705  LOG4CPLUS_DEBUG(
706  daq->m_logger,
707  fmt::format(
708  "{}: InitiateAwaitPrimarySources: "
709  "AwaitAsync completed but another operation has already transitioned "
710  "DAQ from Acquiring so automatic stop will not be performed.",
711  *daq));
712  return;
713  }
714  daq->AddEvent<ActionEvent>(
715  daq->GetId(),
716  "DaqController::InitiateAwaitPrimarySources(): "
717  "Primary sources completed. Performing automatic stop of metadata sources",
718  daq->GetStatusRef().GetStatus());
719 
720  // Merge alerts from async op
721  auto defer = ObservableStatus::DeferSignal(&daq->GetStatusRef());
722  op::MergeAlerts(daq->GetStatusRef(), *alerts);
723 
724  // No continuation is necessary. If an error occurs that information is published
725  // and user needs to intervene.
726  // If a separate topic is created for errors only it may be published with a
727  // continuation attached to the result of StopAsync (or more likely implemented inside
728  // StopAsync).
729  daq->StopAsync(ErrorPolicy::Strict);
730  });
731 }
732 
733 DpmDaqController::DpmDaqController(boost::asio::io_context& io_context,
734  DaqContext context,
735  std::shared_ptr<ObservableStatus> status,
736  std::shared_ptr<ObservableEventLog> event_log,
737  std::shared_ptr<DpmClient> dpm_client)
738  : CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
739  , m_liveness(std::make_shared<bool>(true))
740  , m_dpm_client(std::move(dpm_client))
741  , m_logger(log4cplus::Logger::getInstance("daq.ocm")) {
742  assert(m_dpm_client);
743  UpdateStateContext();
744 
745  // Connect slot to status signal which mirrors DAQ status of *this* from DPM
746  //
747  // Note: weak_from_this cannot be used in constructor as the reference count is not yet
748  // incremented by the holding shared_ptr. For this reason the m_liveness is used instead.
749  m_status_connection = m_dpm_client->ConnectStatusSignal(
750  [id = GetId(), weak = std::weak_ptr<bool>(m_liveness), this](Status const& status) {
751  if (id != status.id) {
752  // Update for another DAQ
753  return;
754  }
755  auto lock = weak.lock();
756  if (!lock) {
757  // Abandoned
758  LOG4CPLUS_DEBUG("daq", "DpmDaqController is abandoned: " << this);
759  return;
760  }
761  // Assign status of DAQ from DPM.
762  LOG4CPLUS_DEBUG(m_logger, *this << ": Assigning new DAQ status from DPM: " << status);
763  GetStatusRef() = status;
764  UpdateStateContext();
765  });
766 }
767 
768 boost::future<State> DpmDaqController::StartAsync() {
769  return boost::make_exceptional_future<State>(
770  std::runtime_error(fmt::format("StartAsync() is invalid in state: {}", GetState())));
771 }
772 
773 boost::future<Status> DpmDaqController::StopAsync(ErrorPolicy) {
774  return boost::make_exceptional_future<Status>(
775  std::runtime_error(fmt::format("StopAsync() is invalid in state: {}", GetState())));
776 }
777 
778 boost::future<State>
779 DpmDaqController::AwaitAsync(std::vector<std::string>, std::chrono::milliseconds) {
780  return boost::make_exceptional_future<State>(std::runtime_error(
781  fmt::format("AwaitAsync() with sources is invalid in state: {}", GetState())));
782 }
783 
785  throw std::runtime_error(fmt::format("UpdateKeywords() is invalid in state: {}", GetState()));
786 }
787 
789  return GetStatusRef().GetState();
790 }
791 
792 boost::future<State> DpmDaqController::ScheduleMergeAsync() {
793  LOG4CPLUS_TRACE(m_logger, *this << ": DpmDaqController::ScheduleMergeAsync");
794 
795  // If we are in NotScheduled and no request in-flight already: Send request to DPM to merge.
796  // otherwise fail.
797  auto starting_state = GetState();
798  if (starting_state != State::NotScheduled) {
799  return boost::make_exceptional_future<State>(std::runtime_error(
800  fmt::format("ScheduleMergeAsync() is invalid in state: {}", GetState())));
801  }
802 
803  assert(std::holds_alternative<NotScheduled>(m_state_ctx));
804  auto& ctx = std::get<NotScheduled>(m_state_ctx);
805  if (ctx.schedule_reply_pending) {
806  // A request has already been sent
807  return boost::make_exceptional_future<State>(
808  std::logic_error("ScheduleMergeAsync() a request is already in flight"));
809  }
810  if (!m_dp_spec) {
811  auto json = CreateDataProductSpecification(GetContext(), m_logger);
812  m_dp_spec = json.dump(2); // indent 2
813  }
814 
815  return m_dpm_client->ScheduleAsync(*m_dp_spec)
816  .then(GetIoExecutor(),
817  [starting_state, weak = weak_from_this(), this](boost::future<State> f) -> State {
818  auto shared = weak.lock();
819  try {
820  auto state = f.get();
821  if (shared) {
822  // Update state only if DAQ has not changed state from starting_state.
823  //
824  // This can e.g. happen if OCM receive published state change data sample
825  // from DPM before receiving command reply.
826  if (GetState() == starting_state) {
827  SetState(state);
828  }
829  LOG4CPLUS_INFO(m_logger,
830  fmt::format("{}: Scheduled DAQ successfully", *this));
831  } else {
832  LOG4CPLUS_DEBUG("daq", "DpmDaqController is abandoned: " << this);
833  }
834  return state;
835  } catch (elt::mal::TimeoutException const&) {
836  // Timeout errors does not result in error flag being set.
837  if (shared) {
838  LOG4CPLUS_INFO(m_logger,
839  fmt::format("{}: Schedule DAQ failed with timeout "
840  "(no error flag set for this condition)",
841  *this));
842  }
843  throw;
844  } catch (...) {
845  if (shared) {
846  GetStatusRef().SetError(true);
847  LOG4CPLUS_ERROR(m_logger, fmt::format("{}: Scheduled DAQ failed", *this));
848  }
849  throw;
850  }
851  });
852 }
853 
854 boost::future<Status> DpmDaqController::AbortAsync(ErrorPolicy policy) {
855  LOG4CPLUS_TRACE(m_logger, *this << ": DpmDaqController::AbortAsync");
856 
857  // 1. If we are sure DAQ has not been scheduled we can abort immediately.
858  auto state = GetState();
859  if (state == State::NotScheduled) {
860  assert(std::holds_alternative<NotScheduled>(m_state_ctx));
861  if (!std::get<NotScheduled>(m_state_ctx).schedule_reply_pending) {
862  SetState(State::Aborted);
863  return boost::make_ready_future<Status>(*GetStatus());
864  }
865  }
866 
867  // 2. Otherwise we must delegate to DPM to abort (this is also why we are not setting state
868  // AbortingMerging as we cannot know until we communiate with DPM).
869  return m_dpm_client->AbortAsync(GetId()).then(
870  GetIoExecutor(), [policy, weak = weak_from_this(), this](boost::future<State> f) -> Status {
871  auto shared = weak.lock();
872  if (!shared) {
873  // Abandonded
874  LOG4CPLUS_DEBUG("daq", "DpmDaqController is abandoned: " << this);
875  throw boost::enable_current_exception(std::runtime_error("Operation aborted"));
876  }
877  try {
878  auto result = f.get();
879  SetState(result);
880 
881  return *GetStatus();
882  } catch (...) {
883  // Even if DPM failed we mark it as aborted if forced
884  if (policy == ErrorPolicy::Tolerant) {
885  LOG4CPLUS_ERROR(m_logger,
886  *this
887  << ": Request to abort DAQ to DPM returned with "
888  "error. ErrorPolicy::Tolerant is used so DAQ is marked "
889  "as aborted anyway");
890  SetState(State::Aborted, true);
891  return *GetStatus();
892  }
893  throw;
894  }
895  });
896 }
897 
898 void DpmDaqController::UpdateStateContext() {
899  auto state = GetState();
900  switch (state) {
901  case State::NotScheduled:
902  m_state_ctx = NotScheduled{false};
903  return;
904  default:
905  m_state_ctx = std::monostate();
906  };
907 }
908 
909 void DpmDaqController::SetState(State state, std::optional<bool> error) {
910  auto prev = GetState();
911  GetStatusRef().SetState(state, error);
912  if (prev != state) {
913  UpdateStateContext();
914  }
915 }
916 
917 nlohmann::json CreateDataProductSpecification(DaqContext const& ctx, log4cplus::Logger& logger) {
918  // @todo: At this time it is only a simplified version of the full capability
919  // of the Data Product Specification without support for :
920  // - in-place target
921  // - keyword rules
922  //
923  // Current simplified source order:
924  // 1. OCM keywords (ctx.keywords)
925  // 2. Order of ctx.results
927  json["id"] = ctx.id;
928  auto sources = nlohmann::json::array();
929 
930  if (!ctx.keywords.empty()) {
931  // OCM keywords
932  auto kws = fits::SerializeJsonKeywords(ctx.keywords);
933  sources.push_back(nlohmann::json::object({{"type", "fitsKeywords"},
934  {"sourceName", ctx.process_name},
935  {"keywords", std::move(kws)}})
936 
937  );
938  }
939 
940  // Temporary heuristics until OCM interface allows the explicit specification:
941  // If
942  // - number of primary sources == 1
943  // - number of files from that primary == 1
944  // then we automatically designate it the *in-place* target.
945  std::optional<nlohmann::json> target;
946  std::optional<std::string> target_source_name;
947 
948  if (1 == ctx.prim_sources.size() &&
949  1 == std::count_if(ctx.results.begin(),
950  ctx.results.end(),
951  [source = ctx.prim_sources[0].name](DpPart const& part) {
952  return part.origin == source;
953  })) {
954  auto it = std::find_if(ctx.results.begin(),
955  ctx.results.end(),
956  [source = ctx.prim_sources[0].name](DpPart const& part) {
957  return part.origin == source;
958  });
959  assert(it != ctx.results.end());
960  if (std::holds_alternative<std::string>(it->info)) {
961  // At this point we have:
962  // 1 primary source
963  // 1 output fits file
964  auto const& path = std::get<std::string>(it->info);
965  target_source_name = ctx.prim_sources[0].name;
966  LOG4CPLUS_DEBUG(logger,
967  fmt::format("{}: Heuristics resulted in using the file "
968  "{} from {} as *in-place* merge target.",
969  ctx.id,
970  path,
971  *target_source_name));
972  target = nlohmann::json::object(
973  {{"type", "fitsFile"}, {"sourceName", it->origin}, {"origin", path}});
974  }
975  }
976 
977  for (DpPart const& r : ctx.results) {
978  if (target_source_name && *target_source_name == r.origin) {
979  // Skip target file
980  continue;
981  }
982  if (std::holds_alternative<fits::KeywordVector>(r.info)) {
983  // keywords
984  auto kws = fits::SerializeJsonKeywords(std::get<fits::KeywordVector>(r.info));
985  sources.push_back(nlohmann::json::object(
986  {{"type", "fitsKeywords"}, {"sourceName", r.origin}, {"keywords", std::move(kws)}})
987 
988  );
989  } else if (std::holds_alternative<std::string>(r.info)) {
990  auto const& path = std::get<std::string>(r.info);
991  sources.push_back(nlohmann::json::object(
992  {{"type", "fitsFile"}, {"sourceName", r.origin}, {"origin", path}})
993 
994  );
995  }
996  }
997  json["sources"] = std::move(sources);
998  json["target"] =
999  nlohmann::json::object({{"fileId", ctx.file_id}, {"filePrefix", ctx.dp_name_prefix}});
1000 
1001  if (target) {
1002  json["target"]["source"] = *target;
1003  }
1004  return json;
1005 }
1006 
1007 } // namespace daq
daq::DpmDaqController::StartAsync
boost::future< State > StartAsync() override
Definition: daqController.cpp:768
daq::OcmDaqController::m_prim_sources
std::vector< Source< PrimSource > > m_prim_sources
Note: Consider vector immutable!
Definition: daqController.hpp:488
daq::DaqContext::prim_sources
std::vector< Source > prim_sources
Definition: daqContext.hpp:80
abort.hpp
Contains declaration for the AbortAsync operation.
dpmClient.hpp
daq::DpmClient
initiate.hpp
Contains declarations for the helper functions to initiate operations.
daq::State
State
Observable states of the data acquisition process.
Definition: state.hpp:39
daq::OcmDaqController::Create
static std::shared_ptr< OcmDaqController > Create(boost::asio::io_context &io_context, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations operations)
Construct object.
Definition: daqController.cpp:157
json.hpp
Contains data structure for FITS keywords.
daq::OcmDaqController::Stopping
Definition: daqController.hpp:432
daq::CommonDaqController
Implements common behaviour of OcmDaqController and DpmDaqController.
Definition: daqController.hpp:332
daq::CommonDaqController::GetStatusRef
ObservableStatus & GetStatusRef() noexcept
Definition: daqController.hpp:364
daq::OcmAsyncOperations::start
std::function< boost::future< void >op::AsyncOpParams)> start
Definition: daqController.hpp:64
daq::DpmDaqController::AbortAsync
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Attempts to abort Data Acquisition.
Definition: daqController.cpp:854
daq::OcmDaqController::StartAsync
boost::future< State > StartAsync() override
Starts the data acquisition.
Definition: daqController.cpp:297
start.hpp
Contains declaration for the StartAsync operation.
daq::MetaSource::RrClient
metadaqif::MetaDaqAsync RrClient
Definition: source.hpp:142
daq::CommonDaqController::GetEventLogRef
ObservableEventLog & GetEventLogRef() noexcept
Definition: daqController.hpp:361
daq::OcmAsyncOperations::IsValid
bool IsValid() const noexcept
Definition: daqController.cpp:77
stop.hpp
Contains declaration for the StopAsync operation.
daq::OcmDaqController::m_pending_replies
std::shared_ptr< PendingReplies > m_pending_replies
Definition: daqController.hpp:492
daq::OcmDaqController::OcmDaqController
OcmDaqController(boost::asio::io_context &io_context, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations ops)
Definition: daqController.cpp:174
daq::OcmDaqController::AwaitAsync
boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout) override
Awaits that data acquisition stops or aborts.
Definition: daqController.cpp:493
daq::PendingReplies
Simple class that allows you to keep track of how many replies are pending.
Definition: pendingReplies.hpp:58
daq::OcmDaqController::m_meta_sources
std::vector< Source< MetaSource > > m_meta_sources
Note: Consider vector immutable!
Definition: daqController.hpp:489
daq::ObservableStatus::GetState
State GetState() const noexcept
Definition: status.cpp:184
daq::OcmAsyncOperations::stop
std::function< boost::future< Result< DpParts > >ErrorPolicy, op::AsyncOpParams)> stop
Definition: daqController.hpp:66
daq::DaqContext::id
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:64
daq::Source
Simple class that holds the source and associated state.
Definition: source.hpp:29
daq::ObservableStatus::DeferSignal
Defer signal changes until later time.
Definition: status.hpp:173
daq::op::AlertState
Definition: asyncOpParams.hpp:18
daq::DaqControllerFactoryDefault::MakeOcmPhase
auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController > override
Create instance for the OCM phase of the DAQ process.
Definition: daqController.cpp:88
DAQ_NOEXCEPT
#define DAQ_NOEXCEPT
Definition: config.hpp:16
daq::CommonDaqController::CommonDaqController
CommonDaqController(boost::asio::io_context &io_context, DaqContext context, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log)
Definition: daqController.cpp:114
daq::DpmDaqController::AwaitAsync
boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout) override
Definition: daqController.cpp:779
daq::OcmDaqController::Starting
Definition: daqController.hpp:430
daq::CommonDaqController::ConnectContext
boost::signals2::connection ConnectContext(ContextSignal::slot_type const &slot) override
Connect observer that is invoked when context is modified.
Definition: daqController.cpp:152
daq::CommonDaqController::EmitContextSignal
void EmitContextSignal()
Definition: daqController.hpp:370
daq::op::AwaitOpParams
Await specific parameters that is not provided with AsyncOpParams.
Definition: asyncOpParams.hpp:114
daq
Definition: asyncProcess.cpp:15
daq::DaqContext::results
DpParts results
Results from Data Acquisition (FITS files and keywords).
Definition: daqContext.hpp:106
daq::OcmAsyncOperations
OCM Async operations.
Definition: daqController.hpp:49
daq::OcmAsyncOperations::await_prim
std::function< AwaitReturnType(op::AwaitOpParams)> await_prim
Definition: daqController.hpp:67
daq::CommonDaqController::GetErrorFlag
bool GetErrorFlag() const DAQ_NOEXCEPT override
Definition: daqController.cpp:143
daq::ObservableStatus::SetError
void SetError(bool error) noexcept
Set error flag for data acquisition.
Definition: status.cpp:205
daq::DaqControllerFactoryDefault::DaqControllerFactoryDefault
DaqControllerFactoryDefault(boost::asio::io_context &io_ctx, elt::mal::Mal &m_mal, std::shared_ptr< DpmClient > dpm_client)
Definition: daqController.cpp:81
daq::DaqContext
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:48
daq::PrimSource::RrClient
recif::RecCmdsAsync RrClient
Definition: source.hpp:98
daq::OcmDaqController::UpdateKeywords
void UpdateKeywords(fits::KeywordVector const &keywords) override
Updates (replace or add) list of keywords.
Definition: daqController.cpp:478
daq::DaqSources
Data acquisition sources.
Definition: source.hpp:184
daq::OcmDaqController::FindSource
std::optional< std::variant< gsl::not_null< Source< PrimSource > * >, gsl::not_null< Source< MetaSource > * > > > FindSource(std::string_view source_id)
Definition: daqController.cpp:632
daq::OcmDaqController::GetLogger
constexpr log4cplus::Logger const & GetLogger() const noexcept
Definition: daqController.cpp:231
daq::OcmDaqController::ScheduleMergeAsync
boost::future< State > ScheduleMergeAsync() override
Schedules DAQ for merging by sending request to DPM.
Definition: daqController.cpp:473
daq::operator<<
daqif::DaqStatus & operator<<(daqif::DaqStatus &status, daq::Status const &rhs)
Convert daq::Status -> daqif::DaqStatus by populating from rhs.
Definition: conversion.cpp:18
daq::OcmDaqController::SetState
void SetState(StateVariant &&s) noexcept
Definition: daqController.cpp:262
daq::OcmDaqController::MakeAwaitParams
op::AwaitOpParams MakeAwaitParams(op::AlertState &)
Definition: daqController.cpp:280
daq::OcmDaqController::Aborted
Definition: daqController.hpp:435
daq::OcmDaqController::MakeParams
op::AsyncOpParams MakeParams(op::AlertState &)
Constructs the parameters used for asynchronous operations.
Definition: daqController.cpp:268
daq::DpmDaqController::UpdateKeywords
void UpdateKeywords(fits::KeywordVector const &keywords) override
Definition: daqController.cpp:784
daq::ObservableStatus::DeferSignal
friend class DeferSignal
Definition: status.hpp:320
daq::Result
Utility class that represents a result and an error.
Definition: utility.hpp:17
daqController.hpp
Contains declaration for for DaqController.
daq::CommonDaqController::GetId
std::string const & GetId() const DAQ_NOEXCEPT override
Definition: daqController.cpp:139
daq::DpmDaqController::GetState
State GetState() const DAQ_NOEXCEPT override
Definition: daqController.cpp:788
daq::DpPart::info
std::variant< std::string, fits::KeywordVector > info
Holds a std::string path [[user]@host:]path or FITS keywords.
Definition: dpPart.hpp:46
daq::OcmDaqController::m_logger
log4cplus::Logger m_logger
Definition: daqController.hpp:502
daq::CommonDaqController::GetEventLog
std::shared_ptr< ObservableEventLog > GetEventLog() DAQ_NOEXCEPT override
Definition: daqController.cpp:135
daq::OcmDaqController::MakeState
StateVariant MakeState(State s) const noexcept
Definition: daqController.cpp:235
daq::OcmDaqController::Acquiring
Definition: daqController.hpp:431
daq::OcmDaqController::StopAsync
boost::future< Status > StopAsync(ErrorPolicy policy) override
Stops the data acquisition.
Definition: daqController.cpp:340
daq::Status::id
std::string id
Definition: status.hpp:136
daq::CreateDataProductSpecification
nlohmann::json CreateDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Creates a Data Product Specification as serialized JSON from the provided DaqContext.
Definition: daqController.cpp:917
daq::DaqContext::keywords
std::vector< daq::fits::KeywordVariant > keywords
Keyword list provided by OCM to Data Product.
Definition: daqContext.hpp:91
daq::CommonDaqController::GetContext
DaqContext const & GetContext() const DAQ_NOEXCEPT override
Definition: daqController.cpp:147
daq::OcmDaqController::Stopped
Definition: daqController.hpp:433
daq::OcmDaqController::GetState
State GetState() const DAQ_NOEXCEPT override
Definition: daqController.cpp:204
daq::OcmDaqController::AbortAsync
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Aborts the data acquisition.
Definition: daqController.cpp:409
daq::OcmAsyncOperations::abort
std::function< boost::future< Result< void > >ErrorPolicy, op::AsyncOpParams)> abort
Definition: daqController.hpp:65
awaitPrim.hpp
Contains declaration for the AwaitPrimAsync operation.
daq::CommonDaqController::GetStatus
std::shared_ptr< ObservableStatus > GetStatus() DAQ_NOEXCEPT override
Definition: daqController.cpp:127
daq::op::MergeAlerts
void MergeAlerts(ObservableStatus &dest, AlertState &src)
Merge alerts.
Definition: asyncOpParams.hpp:43
daq::Result< void >
Definition: utility.hpp:23
daq::DaqContext::file_id
std::string file_id
Data Product FileId as specified by OLAS ICD.
Definition: daqContext.hpp:69
daq::MetaSource
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:140
daq::OcmAsyncOperations::OcmAsyncOperations
OcmAsyncOperations()
Default constructs object with standard async operations.
Definition: daqController.cpp:64
daq::DpPart::origin
std::string origin
Component/process origin of the file.
Definition: dpPart.hpp:41
daq::Status
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:120
daq::CommonDaqController::GetContextMut
DaqContext & GetContextMut() noexcept
Definition: daqController.hpp:358
daq::fits::KeywordVector
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
daq::DaqControllerFactoryDefault::MakeDpmPhase
auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController > override
Create instance for the DPM phase of the DAQ process.
Definition: daqController.cpp:101
daq::MakeState
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:63
daq::op::AsyncOpParams
Parameters required for each async operation.
Definition: asyncOpParams.hpp:67
daq::OcmDaqController::StateVariant
std::variant< NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted > StateVariant
Definition: daqController.hpp:438
daq::DpmDaqController::ScheduleMergeAsync
boost::future< State > ScheduleMergeAsync() override
Schedules DAQ for merging by sending request to DPM.
Definition: daqController.cpp:792
daq::OcmDaqController::NotStarted
Definition: daqController.hpp:429
daq::OcmDaqController::SetErrorFlag
void SetErrorFlag(bool error) noexcept
Definition: daqController.cpp:258
daq::DaqController
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
Definition: daqController.hpp:214
daq::DaqContext::dp_name_prefix
std::string dp_name_prefix
Data product file name prefix.
Definition: daqContext.hpp:79
daq::OcmDaqController::AddInitialKeywords
void AddInitialKeywords()
Definition: daqController.cpp:293
daq::OcmDaqController::m_async_ops
OcmAsyncOperations m_async_ops
Definition: daqController.hpp:491
daq::PrimSource
Keeps relevant state to be able to communicate with a primary data source.
Definition: source.hpp:96
daq::OcmDaqController::Aborting
Definition: daqController.hpp:434
elt::mal
Definition: dpmClient.hpp:25
daq::DpPart
Provides information of the location and origin of a FITS file or keywords produced by a data acquisi...
Definition: dpPart.hpp:26
daq::ErrorPolicy
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
daq::fits::SerializeJsonKeywords
nlohmann::json SerializeJsonKeywords(std::vector< KeywordVariant > const &keywords)
SerializeJsons keyword to JSON.
Definition: json.cpp:200
daq::AddDpParts
void AddDpParts(DaqContext &ctx, std::vector< DpPart > const &parts)
Definition: daqContext.cpp:35
daq::ObservableStatus::SetState
void SetState(State s, std::optional< bool > error=std::nullopt) noexcept
Set state of data acquisition.
Definition: status.cpp:192
daq::UpdateKeywords
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
Definition: daqContext.cpp:28
daq::ErrorPolicy::Strict
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
daq::OcmDaqController::m_state
StateVariant m_state
Definition: daqController.hpp:486
error
Definition: main.cpp:23
daq::DaqContext::process_name
std::string process_name
User defined process name.
Definition: daqContext.hpp:74
daq::ObservableEventLog::AddEvent
void AddEvent(EventLog::EventType event)
Records that a file has been produced for this data acquisition.
Definition: eventLog.cpp:54
daq::CommonDaqController::GetIoExecutor
rad::IoExecutor & GetIoExecutor() noexcept
Definition: daqController.hpp:355
error.hpp
Contains error related declarations for DAQ.
daq::DpmDaqController::StopAsync
boost::future< Status > StopAsync(ErrorPolicy policy) override
Definition: daqController.cpp:773
daq::json
nlohmann::json json
Definition: json.cpp:20
daq::ActionEvent
Event related to an action being requested or performed.
Definition: eventLog.hpp:56