7 #include <fmt/format.h>
8 #include <fmt/ostream.h>
9 #include <log4cplus/loggingmacros.h>
10 #include <mal/MalException.hpp>
11 #include <mal/rr/qos/ReplyTime.hpp>
12 #include <nlohmann/json.hpp>
26 DaqSources MakeSources(mal::Mal&
mal, DaqContext
const& ctx) {
27 std::vector<PrimSource> psources;
28 std::vector<MetaSource> msources;
30 psources.reserve(ctx.prim_sources.size());
31 for (
auto const& raw : ctx.prim_sources) {
32 psources.emplace_back(
36 {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(5))},
40 msources.reserve(ctx.meta_sources.size());
41 for (
auto const& raw : ctx.meta_sources) {
42 msources.emplace_back(
46 {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(5))},
50 return DaqSources(std::move(psources), std::move(msources));
54 inline constexpr
bool always_false_v =
false;
56 template <
class Sources>
57 bool AllInState(Sources sources,
State state) {
59 sources.begin(), sources.end(), [=](
auto const& s) ->
bool { return s.state == state; });
65 : start([](auto par) {
return daq::op::InitiateOperation<daq::op::StartAsync>(par); })
67 return daq::op::InitiateOperation<op::AbortAsync>(policy, std::move(par));
70 return daq::op::InitiateOperation<op::StopAsync>(policy, std::move(par));
72 , await_prim([](
auto par) {
73 return daq::op::InitiateAbortableOperation<op::AwaitPrimAsync>(std::move(par));
83 std::shared_ptr<DpmClient> dpm_client)
84 : m_io_ctx(io_ctx), m_mal(
mal), m_async_ops(), m_dpm_client(std::move(dpm_client)) {
89 std::shared_ptr<ObservableStatus> status,
90 std::shared_ptr<ObservableEventLog> event_log)
91 -> std::shared_ptr<DaqController> {
92 DaqSources sources = MakeSources(m_mal, daq_ctx);
102 std::shared_ptr<ObservableStatus> status,
103 std::shared_ptr<ObservableEventLog> event_log)
104 -> std::shared_ptr<DaqController> {
105 return std::make_shared<DpmDaqController>(
106 m_io_ctx, std::move(daq_ctx), std::move(status), std::move(event_log), m_dpm_client);
110 os <<
"DaqController(id='" <<
daq.GetId() <<
"', state=" <<
daq.GetState() <<
")";
116 std::shared_ptr<ObservableStatus> status,
117 std::shared_ptr<ObservableEventLog> event_log)
118 : m_io_ctx(io_context)
119 , m_executor(m_io_ctx)
120 , m_context(std::move(context))
121 , m_status(std::move(status))
122 , m_event_log(std::move(event_log)) {
140 return m_status->GetId();
144 return m_status->GetError();
151 boost::signals2::connection
153 return m_sig_context.connect(slot);
156 std::shared_ptr<OcmDaqController>
160 std::shared_ptr<ObservableStatus> status,
161 std::shared_ptr<ObservableEventLog> event_log,
165 LOG4CPLUS_TRACE(
"daq", fmt::format(
"OcmDaqController::Create"));
170 std::move(event_log),
177 std::shared_ptr<ObservableStatus> status,
178 std::shared_ptr<ObservableEventLog> event_log,
180 :
CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
182 , m_state(
MakeState(GetStatusRef().GetState()))
183 , m_prim_sources(MakeSources<
PrimSource>(sources.GetPrimarySources()))
184 , m_meta_sources(MakeSources<
MetaSource>(sources.GetMetadataSources()))
185 , m_async_ops(std::move(ops))
187 , m_logger(log4cplus::Logger::getInstance(
"daq.ocm.controller")) {
189 throw boost::enable_current_exception(
190 std::invalid_argument(
"Data acquisition id mismatch between DaqContext "
191 "and ObservableStatus"));
195 throw boost::enable_current_exception(std::invalid_argument(
"No data sources provided"));
199 throw boost::enable_current_exception(
200 std::invalid_argument(
"OcmAsyncOperations is invalid"));
207 [&](
auto const& var) {
208 using T = std::decay_t<decltype(var)>;
209 if constexpr (std::is_same_v<T, NotStarted>) {
211 }
else if constexpr (std::is_same_v<T, Starting>) {
213 }
else if constexpr (std::is_same_v<T, Acquiring>) {
215 }
else if constexpr (std::is_same_v<T, Stopping>) {
217 }
else if constexpr (std::is_same_v<T, Stopped>) {
219 }
else if constexpr (std::is_same_v<T, Aborting>) {
221 }
else if constexpr (std::is_same_v<T, Aborted>) {
224 static_assert(always_false_v<T>,
"non-exhaustive visitor!");
254 LOG4CPLUS_FATAL(m_logger, fmt::format(
"Invalid state provided: '{}'", s));
259 GetStatusRef().SetError(
error);
265 GetStatusRef().SetState(GetState());
302 if (!std::holds_alternative<NotStarted>(
m_state)) {
303 return boost::make_exceptional_future<State>(
304 std::runtime_error(
"Data acquisition is already started"));
311 auto alerts = std::make_unique<op::AlertState>();
314 [alerts = std::move(alerts),
315 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
316 boost::future<void> f) {
317 LOG4CPLUS_INFO(
daq->GetLogger(),
318 fmt::format(
"{}: StartAsync: Completed {}",
320 f.has_value() ?
"successfully" :
"with error"));
326 if (f.has_exception()) {
327 daq->SetErrorFlag(
true);
331 daq->InitiateAwaitPrimarySources();
334 daq->SetErrorFlag(
false);
336 return daq->GetState();
344 if (std::holds_alternative<Stopped>(
m_state)) {
345 return boost::make_exceptional_future<Status>(
346 std::runtime_error(
"Data acquisition already stopped"));
349 if (std::holds_alternative<Aborted>(
m_state)) {
350 return boost::make_exceptional_future<Status>(
351 std::runtime_error(
"Data acquisition already aborted"));
354 if (!std::holds_alternative<Acquiring>(
m_state)) {
355 return boost::make_exceptional_future<Status>(
356 std::runtime_error(
"Cannot stop a data acquisition that is not Acquiring"));
365 auto alerts = std::make_unique<op::AlertState>();
368 [alerts = std::move(alerts),
369 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
377 fmt::format(
"{}: StopAsync: Data acquisition modified by other commands. "
378 "Do nothing else (errors are ignored). ",
380 throw std::runtime_error(fmt::format(
381 "Stop command could not be completed because Data Acquisitions state was "
382 "modified in the meantime (current state: {})",
385 LOG4CPLUS_INFO(
daq->GetLogger(),
386 fmt::format(
"{}: StopAsync: Completed {}",
388 f.has_value() ?
"successfully" :
"with error"));
392 if (f.has_exception()) {
393 daq->SetErrorFlag(true);
397 auto result = f.get();
399 daq->EmitContextSignal();
403 daq->SetErrorFlag(result.error);
405 return daq->GetStatus()->GetStatus();
413 if (std::holds_alternative<NotStarted>(
m_state)) {
414 LOG4CPLUS_INFO(
m_logger, fmt::format(
"{}: Aborting not started data acquisition", *
this));
419 if (std::holds_alternative<Stopped>(
m_state) || std::holds_alternative<Aborted>(
m_state)) {
420 return boost::make_exceptional_future<Status>(
421 std::runtime_error(
"Data acquisition already stopped or aborted"));
426 auto alerts = std::make_unique<op::AlertState>();
429 [alerts = std::move(alerts),
430 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
437 fmt::format(
"{}: AbortAsync: Data acquisition already aborted. "
438 "Do nothing else (errors are ignored). ",
442 return daq->GetStatus()->GetStatus();
444 LOG4CPLUS_DEBUG(
daq->GetLogger(),
445 fmt::format(
"{}: AbortAsync: Completed, updating DAQ status and "
446 "set reply remaining. Has fatal error={}",
454 if (f.has_exception()) {
457 fmt::format(
"{}: AbortAsync: Completed with fatal error.", *daq));
459 daq->SetErrorFlag(true);
462 auto result = f.get();
463 daq->SetErrorFlag(result.error);
467 LOG4CPLUS_INFO(
daq->GetLogger(),
468 fmt::format(
"{}: AbortAsync: Completed successfully.", *
daq));
469 return daq->GetStatus()->GetStatus();
474 return boost::make_exceptional_future<State>(std::runtime_error(
475 fmt::format(
"ScheduleMergeAsync() is invalid in state: {}",
GetState())));
480 fmt::format(
"DaqController::UpdateKeywords(<omitted>)"),
483 if (std::holds_alternative<Stopped>(
m_state) || std::holds_alternative<Aborted>(
m_state)) {
484 throw boost::enable_current_exception(
485 std::runtime_error(
"Data acquisition already stopped or aborted"));
496 fmt::format(
"DaqController::AwaitAsync({}, {} ms)",
497 sources.empty() ?
"all primary sources" :
"a user defined list of sources",
502 std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
505 if (!sources.empty()) {
506 for (
auto const& source_id : sources) {
509 await_on.emplace_back(*source_var);
511 return boost::make_exceptional_future<State>(
512 std::invalid_argument(fmt::format(
"Source with id='{}' not found", source_id)));
518 await_on.emplace_back(&source);
524 auto condition = [sources = await_on]() {
525 return std::all_of(sources.begin(), sources.end(), [](
auto var) {
528 return v->GetState() == State::Aborted || v->GetState() == State::Stopped;
536 return boost::make_ready_future<State>(GetState());
542 auto promise = std::make_shared<boost::promise<State>>();
544 if (timeout > std::chrono::milliseconds(0)) {
545 auto timer = std::make_unique<boost::asio::steady_timer>(GetIoCtx(), timeout);
546 timer->async_wait([promise,
547 timer_ptr = timer.get(),
548 daq_weak = std::weak_ptr<OcmDaqController>(
549 std::static_pointer_cast<OcmDaqController>(shared_from_this()))](
550 boost::system::error_code ec) {
553 auto daq = daq_weak.lock();
558 fmt::format(
"{}: AsyncWait: Operation abandoned before completing.",
562 promise->set_exception(DaqOperationAborted(
""));
569 fmt::format(
"{}: AsyncWait: Operation timed out before completing.",
573 daq->m_timers.begin(),
575 [timer_ptr](std::unique_ptr<boost::asio::steady_timer>& val) {
576 return val.get() == timer_ptr;
578 daq->m_timers.end());
581 promise->set_exception(DaqOperationTimeout(
""));
586 m_timers.emplace_back(std::move(timer));
589 auto listener = [condition,
591 daq_weak = std::weak_ptr<OcmDaqController>(
592 std::static_pointer_cast<OcmDaqController>(shared_from_this()))](
State,
594 auto daq = daq_weak.lock();
596 LOG4CPLUS_WARN(
"daq",
"OcmDaqController deleted before await condition was fulfulled");
602 LOG4CPLUS_INFO(
daq->m_logger,
603 fmt::format(
"{}: AwaitAsync: Await condition fulfilled", *
daq));
605 promise->set_value(
daq->GetState());
614 for (
auto& source : await_on) {
616 [
daq = std::static_pointer_cast<OcmDaqController>(shared_from_this()),
620 fmt::format(
"{}: AsyncWait: Attaching listener on source '{}'.", *
daq, *s));
622 using SlotType =
typename std::remove_reference<
623 decltype(*s.get())>::type::StateSignal::slot_type;
624 s->ConnectStateListener(SlotType(std::move(listener)).track_foreign(
daq));
628 return promise->get_future();
631 std::optional<std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
632 OcmDaqController::FindSource(std::string_view source_id) {
635 std::find_if(m_prim_sources.begin(), m_prim_sources.end(), [source_id](
auto& source) {
636 return source.GetSource().GetName() == source_id;
638 if (it != m_prim_sources.end()) {
639 return gsl::not_null<Source<PrimSource>*>(&(*it));
644 std::find_if(m_meta_sources.begin(), m_meta_sources.end(), [source_id](
auto& source) {
645 return source.GetSource().GetName() == source_id;
647 if (it != m_meta_sources.end()) {
648 return gsl::not_null<Source<MetaSource>*>(&(*it));
656 template <
class SourceType>
657 std::vector<Source<SourceType>> OcmDaqController::MakeSources(std::vector<SourceType> sources) {
659 std::vector<Source<SourceType>> dest;
660 dest.reserve(sources.size());
662 std::make_move_iterator(sources.begin()),
663 std::make_move_iterator(sources.end()),
664 std::back_inserter(dest),
665 [](SourceType&& s) ->
Source<SourceType> { return Source<SourceType>{std::move(s)}; });
669 void OcmDaqController::InitiateAwaitPrimarySources() {
670 AddEvent<ActionEvent>(GetId(),
671 "DaqController::InitiateAwaitPrimarySources(): Initiating",
672 GetStatusRef().GetStatus());
673 if (m_prim_sources.empty()) {
676 fmt::format(
"{}: InitiateAwaitPrimarySources: No primary sources to monitor.", *
this));
679 LOG4CPLUS_DEBUG(m_logger,
680 fmt::format(
"{}: InitiateAwaitPrimarySources: Starting operation.", *
this));
681 auto alerts = std::make_unique<op::AlertState>();
682 auto [future, abort] = m_async_ops.await_prim(MakeAwaitParams(*alerts.get()));
683 m_abort_await_primary_sources = abort;
687 [alerts = std::move(alerts),
688 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
691 auto result = fut.get();
692 LOG4CPLUS_DEBUG(
daq->m_logger,
693 fmt::format(
"{}: InitiateAwaitPrimarySources: Adding {} files from "
696 result.result.size()));
701 daq->EmitContextSignal();
704 if (!std::holds_alternative<Acquiring>(
daq->m_state)) {
708 "{}: InitiateAwaitPrimarySources: "
709 "AwaitAsync completed but another operation has already transitioned "
710 "DAQ from Acquiring so automatic stop will not be performed.",
716 "DaqController::InitiateAwaitPrimarySources(): "
717 "Primary sources completed. Performing automatic stop of metadata sources",
718 daq->GetStatusRef().GetStatus());
729 daq->StopAsync(ErrorPolicy::Strict);
733 DpmDaqController::DpmDaqController(boost::asio::io_context& io_context,
735 std::shared_ptr<ObservableStatus> status,
736 std::shared_ptr<ObservableEventLog> event_log,
737 std::shared_ptr<DpmClient> dpm_client)
738 :
CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
739 , m_liveness(std::make_shared<bool>(true))
740 , m_dpm_client(std::move(dpm_client))
741 , m_logger(log4cplus::Logger::getInstance(
"daq.ocm.controller")) {
742 assert(m_dpm_client);
743 UpdateStateContext();
749 m_status_connection = m_dpm_client->ConnectStatusSignal(
750 [
id =
GetId(), weak = std::weak_ptr<bool>(m_liveness),
this](
Status const& status) {
751 if (
id != status.id) {
755 auto lock = weak.lock();
758 LOG4CPLUS_DEBUG(
"daq",
"DpmDaqController is abandoned: " <<
this);
762 LOG4CPLUS_DEBUG(m_logger, *
this <<
": Assigning new DAQ status from DPM: " << status);
764 UpdateStateContext();
769 return boost::make_exceptional_future<State>(
770 std::runtime_error(fmt::format(
"StartAsync() is invalid in state: {}",
GetState())));
774 return boost::make_exceptional_future<Status>(
775 std::runtime_error(fmt::format(
"StopAsync() is invalid in state: {}",
GetState())));
780 return boost::make_exceptional_future<State>(std::runtime_error(
781 fmt::format(
"AwaitAsync() with sources is invalid in state: {}",
GetState())));
785 throw std::runtime_error(fmt::format(
"UpdateKeywords() is invalid in state: {}",
GetState()));
793 LOG4CPLUS_TRACE(m_logger, *
this <<
": DpmDaqController::ScheduleMergeAsync");
799 return boost::make_exceptional_future<State>(std::runtime_error(
800 fmt::format(
"ScheduleMergeAsync() is invalid in state: {}",
GetState())));
803 assert(std::holds_alternative<NotScheduled>(m_state_ctx));
804 auto& ctx = std::get<NotScheduled>(m_state_ctx);
805 if (ctx.schedule_reply_pending) {
807 return boost::make_exceptional_future<State>(
808 std::logic_error(
"ScheduleMergeAsync() a request is already in flight"));
814 m_dp_spec = j.dump(2);
815 LOG4CPLUS_DEBUG(m_logger,
"Created DpSpec:\n" << *m_dp_spec);
818 return m_dpm_client->ScheduleAsync(*m_dp_spec)
820 [starting_state, weak = weak_from_this(),
this](boost::future<State> f) ->
State {
822 auto shared = weak.lock();
824 auto state = f.get();
835 status.ClearAlert(alert_id);
836 LOG4CPLUS_INFO(m_logger,
837 fmt::format(
"{}: Scheduled DAQ successfully", *
this));
839 LOG4CPLUS_DEBUG(
"daq",
"DpmDaqController is abandoned: " <<
this);
842 }
catch (elt::mal::TimeoutException
const&) {
845 LOG4CPLUS_INFO(m_logger,
846 fmt::format(
"{}: Schedule DAQ failed with timeout "
847 "(no error flag set for this condition)",
851 }
catch (std::exception
const& e) {
857 fmt::format(
"Failed to schedule DAQ for merging: {}", e.what())));
858 status.SetError(
true);
859 LOG4CPLUS_ERROR(m_logger, fmt::format(
"{}: Scheduled DAQ failed", *
this));
867 LOG4CPLUS_TRACE(m_logger, *
this <<
": DpmDaqController::AbortAsync");
872 assert(std::holds_alternative<NotScheduled>(m_state_ctx));
873 if (!std::get<NotScheduled>(m_state_ctx).schedule_reply_pending) {
875 return boost::make_ready_future<Status>(*
GetStatus());
881 return m_dpm_client->AbortAsync(
GetId()).then(
882 GetIoExecutor(), [policy, weak = weak_from_this(),
this](boost::future<State> f) ->
Status {
883 auto shared = weak.lock();
886 LOG4CPLUS_DEBUG(
"daq",
"DpmDaqController is abandoned: " <<
this);
887 throw boost::enable_current_exception(std::runtime_error(
"Operation aborted"));
890 auto result = f.get();
897 LOG4CPLUS_ERROR(m_logger,
899 <<
": Request to abort DAQ to DPM returned with "
900 "error. ErrorPolicy::Tolerant is used so DAQ is marked "
901 "as aborted anyway");
910 void DpmDaqController::UpdateStateContext() {
917 m_state_ctx = std::monostate();
921 void DpmDaqController::SetState(
State state, std::optional<bool>
error) {
925 UpdateStateContext();
Contains declaration for the AbortAsync operation.
Contains declaration for the AwaitPrimAsync operation.
Implements common behaviour of OcmDaqController and DpmDaqController.
ObservableEventLog & GetEventLogRef() noexcept
DaqContext const & GetContext() const DAQ_NOEXCEPT override
ObservableStatus & GetStatusRef() noexcept
std::shared_ptr< ObservableStatus > GetStatus() DAQ_NOEXCEPT override
bool GetErrorFlag() const DAQ_NOEXCEPT override
std::shared_ptr< ObservableEventLog > GetEventLog() DAQ_NOEXCEPT override
boost::signals2::connection ConnectContext(ContextSignal::slot_type const &slot) override
Connect observer that is invoked when context is modified.
DaqContext & GetContextMut() noexcept
std::string const & GetId() const DAQ_NOEXCEPT override
rad::IoExecutor & GetIoExecutor() noexcept
CommonDaqController(boost::asio::io_context &io_context, DaqContext context, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log)
auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController > override
Create instance for the DPM phase of the DAQ process.
auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController > override
Create instance for the OCM phase of the DAQ process.
DaqControllerFactoryDefault(boost::asio::io_context &io_ctx, elt::mal::Mal &m_mal, std::shared_ptr< DpmClient > dpm_client)
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
Data acquisition sources.
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Attempts to abort Data Acquisition.
void UpdateKeywords(fits::KeywordVector const &keywords) override
boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout) override
boost::future< State > ScheduleMergeAsync() override
Schedules DAQ for merging by sending request to DPM.
State GetState() const DAQ_NOEXCEPT override
boost::future< Status > StopAsync(ErrorPolicy policy) override
boost::future< State > StartAsync() override
void AddEvent(EventLog::EventType event)
Records that a file has been produced for this data acquisition.
Defer signal changes until later time.
State GetState() const noexcept
void SetState(State s, std::optional< bool > error=std::nullopt) noexcept
Set state of data acquisition.
boost::future< State > StartAsync() override
Starts the data acquisition.
constexpr log4cplus::Logger const & GetLogger() const noexcept
std::vector< Source< PrimSource > > m_prim_sources
Note: Consider vector immutable!
State GetState() const DAQ_NOEXCEPT override
op::AsyncOpParams MakeParams(op::AlertState &)
Constructs the parameters used for asynchronous operations.
static std::shared_ptr< OcmDaqController > Create(boost::asio::io_context &io_context, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations operations)
Construct object.
StateVariant MakeState(State s) const noexcept
std::optional< std::variant< gsl::not_null< Source< PrimSource > * >, gsl::not_null< Source< MetaSource > * > > > FindSource(std::string_view source_id)
log4cplus::Logger m_logger
void UpdateKeywords(fits::KeywordVector const &keywords) override
Updates (replace or add) list of keywords.
std::shared_ptr< PendingReplies > m_pending_replies
std::variant< NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted > StateVariant
OcmAsyncOperations m_async_ops
std::vector< Source< MetaSource > > m_meta_sources
Note: Consider vector immutable!
boost::future< State > ScheduleMergeAsync() override
Schedules DAQ for merging by sending request to DPM.
OcmDaqController(boost::asio::io_context &io_context, DaqContext context, DaqSources const &sources, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log, OcmAsyncOperations ops)
boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout) override
Awaits that data acquisition stops or aborts.
void AddInitialKeywords()
op::AwaitOpParams MakeAwaitParams(op::AlertState &)
void SetErrorFlag(bool error) noexcept
boost::future< Status > AbortAsync(ErrorPolicy policy) override
Aborts the data acquisition.
void SetState(StateVariant &&s) noexcept
boost::future< Status > StopAsync(ErrorPolicy policy) override
Stops the data acquisition.
Simple class that allows you to keep track of how many replies are pending.
Keeps relevant state to be able to communicate with a primary data source.
recif::RecCmdsAsync RrClient
Contains data structure for FITS keywords.
Contains error related declarations for DAQ.
Contains declarations for the helper functions to initiate operations.
constexpr std::string_view REQUEST
Request.
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
void MergeAlerts(ObservableStatus &dest, AlertState &src)
Merge alerts.
daqif::DaqStatus & operator<<(daqif::DaqStatus &status, daq::Status const &rhs)
Convert daq::Status -> daqif::DaqStatus by populating from rhs.
AlertId MakeAlertId(std::string_view category, std::string key)
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
void AddDpParts(DaqContext &ctx, std::vector< DpPart > const &parts)
ErrorPolicy
Error policy supported by certain operations.
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
State
Observable states of the data acquisition process.
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Aborted
Data acquisition has been aborted by user.
@ Stopping
Transitional state between Acquiring and Stopped.
@ Acquiring
All data sources have reported data acquisition is in progress.
@ Stopped
All data sources have reported they have stopped acquiring data.
@ Starting
Transitional state between NotStarted and Acquiring when sources have not begun acquiring data yet.
@ AbortingAcquiring
Transitional state for aborting during acquisition.
@ NotStarted
Initial state of data acquisition.
NLOHMANN_JSON_SERIALIZE_ENUM(State, { {State::NotStarted, "NotStarted"}, {State::Starting, "Starting"}, {State::Acquiring, "Acquiring"}, {State::Stopping, "Stopping"}, {State::Stopped, "Stopped"}, {State::NotScheduled, "NotScheduled"}, {State::Scheduled, "Scheduled"}, {State::Transferring, "Transferring"}, {State::Merging, "Merging"}, {State::Releasing, "Releasing"}, {State::AbortingAcquiring, "AbortingAcquiring"}, {State::AbortingMerging, "AbortingMerging"}, {State::Aborted, "Aborted"}, {State::Completed, "Completed"}, }) void to_json(nlohmann void to_json(nlohmann::json &j, Alert const &p)
json::DpSpec MakeDataProductSpecification(DaqContext const &ctx, log4cplus::Logger &logger)
Creates a Data Product Specification as serialized JSON from the provided DaqContext.
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Utility class that represents a result and an error.
Contains declaration for for DaqController.
Contains declaration for the StartAsync operation.
Contains declaration for the StopAsync operation.
Event related to an action being requested or performed.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
std::function< boost::future< Result< void > >ErrorPolicy, op::AsyncOpParams)> abort
bool IsValid() const noexcept
std::function< boost::future< void >op::AsyncOpParams)> start
std::function< AwaitReturnType(op::AwaitOpParams)> await_prim
OcmAsyncOperations()
Default constructs object with standard async operations.
std::function< boost::future< Result< DpParts > >ErrorPolicy, op::AsyncOpParams)> stop
Simple class that holds the source and associated state.
Non observable status object that keeps stores status of data acquisition.
Parameters required for each async operation.
Await specific parameters that is not provided with AsyncOpParams.