7 #include <fmt/format.h>
8 #include <fmt/ostream.h>
9 #include <log4cplus/loggingmacros.h>
10 #include <mal/MalException.hpp>
11 #include <mal/rr/qos/ReplyTime.hpp>
12 #include <nlohmann/json.hpp>
26 DaqSources MakeSources(mal::Mal&
mal, DaqContext
const& ctx) {
27 std::vector<PrimSource> psources;
28 std::vector<MetaSource> msources;
30 psources.reserve(ctx.prim_sources.size());
31 for (
auto const& raw : ctx.prim_sources) {
32 psources.emplace_back(
36 {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(5))},
40 msources.reserve(ctx.meta_sources.size());
41 for (
auto const& raw : ctx.meta_sources) {
42 msources.emplace_back(
46 {std::make_shared<elt::mal::rr::qos::ReplyTime>(std::chrono::seconds(5))},
50 return DaqSources(std::move(psources), std::move(msources));
54 inline constexpr
bool always_false_v =
false;
56 template <
class Sources>
57 bool AllInState(Sources sources,
State state) {
59 sources.begin(), sources.end(), [=](
auto const& s) ->
bool { return s.state == state; });
65 : start([](auto par) {
return daq::op::InitiateOperation<daq::op::StartAsync>(par); })
67 return daq::op::InitiateOperation<op::AbortAsync>(policy, std::move(par));
70 return daq::op::InitiateOperation<op::StopAsync>(policy, std::move(par));
72 , await_prim([](
auto par) {
73 return daq::op::InitiateAbortableOperation<op::AwaitPrimAsync>(std::move(par));
83 std::shared_ptr<DpmClient> dpm_client)
84 : m_io_ctx(io_ctx), m_mal(
mal), m_async_ops(), m_dpm_client(std::move(dpm_client)) {
89 std::shared_ptr<ObservableStatus> status,
90 std::shared_ptr<ObservableEventLog> event_log)
91 -> std::shared_ptr<DaqController> {
92 DaqSources sources = MakeSources(m_mal, daq_ctx);
102 std::shared_ptr<ObservableStatus> status,
103 std::shared_ptr<ObservableEventLog> event_log)
104 -> std::shared_ptr<DaqController> {
105 return std::make_shared<DpmDaqController>(
106 m_io_ctx, std::move(daq_ctx), std::move(status), std::move(event_log), m_dpm_client);
110 os <<
"DaqController(id='" <<
daq.GetId() <<
"', state=" <<
daq.GetState() <<
")";
116 std::shared_ptr<ObservableStatus> status,
117 std::shared_ptr<ObservableEventLog> event_log)
118 : m_io_ctx(io_context)
119 , m_executor(m_io_ctx)
120 , m_context(std::move(context))
121 , m_status(std::move(status))
122 , m_event_log(std::move(event_log)) {
140 return m_status->GetId();
144 return m_status->GetError();
151 boost::signals2::connection
153 return m_sig_context.connect(slot);
156 std::shared_ptr<OcmDaqController>
160 std::shared_ptr<ObservableStatus> status,
161 std::shared_ptr<ObservableEventLog> event_log,
165 LOG4CPLUS_TRACE(
"daq", fmt::format(
"OcmDaqController::Create"));
170 std::move(event_log),
177 std::shared_ptr<ObservableStatus> status,
178 std::shared_ptr<ObservableEventLog> event_log,
180 :
CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
182 , m_state(
MakeState(GetStatusRef().GetState()))
183 , m_prim_sources(MakeSources<
PrimSource>(sources.GetPrimarySources()))
184 , m_meta_sources(MakeSources<
MetaSource>(sources.GetMetadataSources()))
185 , m_async_ops(std::move(ops))
187 , m_logger(log4cplus::Logger::getInstance(
"daq")) {
189 throw boost::enable_current_exception(
190 std::invalid_argument(
"Data acquisition id mismatch between DaqContext "
191 "and ObservableStatus"));
195 throw boost::enable_current_exception(std::invalid_argument(
"No data sources provided"));
199 throw boost::enable_current_exception(
200 std::invalid_argument(
"OcmAsyncOperations is invalid"));
207 [&](
auto const& var) {
208 using T = std::decay_t<decltype(var)>;
209 if constexpr (std::is_same_v<T, NotStarted>) {
210 s = State::NotStarted;
211 }
else if constexpr (std::is_same_v<T, Starting>) {
213 }
else if constexpr (std::is_same_v<T, Acquiring>) {
214 s = State::Acquiring;
215 }
else if constexpr (std::is_same_v<T, Stopping>) {
217 }
else if constexpr (std::is_same_v<T, Stopped>) {
219 }
else if constexpr (std::is_same_v<T, Aborting>) {
220 s = State::AbortingAcquiring;
221 }
else if constexpr (std::is_same_v<T, Aborted>) {
224 static_assert(always_false_v<T>,
"non-exhaustive visitor!");
237 case State::NotStarted:
239 case State::Starting:
241 case State::Acquiring:
243 case State::Stopping:
247 case State::AbortingAcquiring:
254 LOG4CPLUS_FATAL(m_logger, fmt::format(
"Invalid state provided: '{}'", s));
259 GetStatusRef().SetError(
error);
265 GetStatusRef().SetState(GetState());
302 if (!std::holds_alternative<NotStarted>(
m_state)) {
303 return boost::make_exceptional_future<State>(
304 std::runtime_error(
"Data acquisition is already started"));
311 auto alerts = std::make_unique<op::AlertState>();
314 [alerts = std::move(alerts),
315 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
316 boost::future<void> f) {
317 LOG4CPLUS_INFO(
daq->GetLogger(),
318 fmt::format(
"{}: StartAsync: Completed {}",
320 f.has_value() ?
"successfully" :
"with error"));
326 if (f.has_exception()) {
327 daq->SetErrorFlag(
true);
331 daq->InitiateAwaitPrimarySources();
334 daq->SetErrorFlag(
false);
336 return daq->GetState();
344 if (std::holds_alternative<Stopped>(
m_state)) {
345 return boost::make_exceptional_future<Status>(
346 std::runtime_error(
"Data acquisition already stopped"));
349 if (std::holds_alternative<Aborted>(
m_state)) {
350 return boost::make_exceptional_future<Status>(
351 std::runtime_error(
"Data acquisition already aborted"));
354 if (!std::holds_alternative<Acquiring>(
m_state)) {
355 return boost::make_exceptional_future<Status>(
356 std::runtime_error(
"Cannot stop a data acquisition that is not Acquiring"));
365 auto alerts = std::make_unique<op::AlertState>();
368 [alerts = std::move(alerts),
369 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
371 if (
daq->GetState() != State::Stopping) {
377 fmt::format(
"{}: StopAsync: Data acquisition modified by other commands. "
378 "Do nothing else (errors are ignored). ",
380 throw std::runtime_error(fmt::format(
381 "Stop command could not be completed because Data Acquisitions state was "
382 "modified in the meantime (current state: {})",
385 LOG4CPLUS_INFO(
daq->GetLogger(),
386 fmt::format(
"{}: StopAsync: Completed {}",
388 f.has_value() ?
"successfully" :
"with error"));
392 if (f.has_exception()) {
393 daq->SetErrorFlag(true);
397 auto result = f.get();
399 daq->EmitContextSignal();
403 daq->SetErrorFlag(result.error);
405 return daq->GetStatus()->GetStatus();
413 if (std::holds_alternative<NotStarted>(
m_state)) {
414 LOG4CPLUS_INFO(
m_logger, fmt::format(
"{}: Aborting not started data acquisition", *
this));
419 if (std::holds_alternative<Stopped>(
m_state) || std::holds_alternative<Aborted>(
m_state)) {
420 return boost::make_exceptional_future<Status>(
421 std::runtime_error(
"Data acquisition already stopped or aborted"));
426 auto alerts = std::make_unique<op::AlertState>();
429 [alerts = std::move(alerts),
430 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
432 if (
daq->GetState() == State::Aborted) {
437 fmt::format(
"{}: AbortAsync: Data acquisition already aborted. "
438 "Do nothing else (errors are ignored). ",
442 return daq->GetStatus()->GetStatus();
444 LOG4CPLUS_DEBUG(
daq->GetLogger(),
445 fmt::format(
"{}: AbortAsync: Completed, updating DAQ status and "
446 "set reply remaining. Has fatal error={}",
454 if (f.has_exception()) {
457 fmt::format(
"{}: AbortAsync: Completed with fatal error.", *daq));
459 daq->SetErrorFlag(true);
462 auto result = f.get();
463 daq->SetErrorFlag(result.error);
467 LOG4CPLUS_INFO(
daq->GetLogger(),
468 fmt::format(
"{}: AbortAsync: Completed successfully.", *
daq));
469 return daq->GetStatus()->GetStatus();
474 return boost::make_exceptional_future<State>(std::runtime_error(
475 fmt::format(
"ScheduleMergeAsync() is invalid in state: {}",
GetState())));
480 fmt::format(
"DaqController::UpdateKeywords(<omitted>)"),
483 if (std::holds_alternative<Stopped>(
m_state) || std::holds_alternative<Aborted>(
m_state)) {
484 throw boost::enable_current_exception(
485 std::runtime_error(
"Data acquisition already stopped or aborted"));
496 fmt::format(
"DaqController::AwaitAsync({}, {} ms)",
497 sources.empty() ?
"all primary sources" :
"a user defined list of sources",
502 std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
505 if (!sources.empty()) {
506 for (
auto const& source_id : sources) {
509 await_on.emplace_back(*source_var);
511 return boost::make_exceptional_future<State>(
512 std::invalid_argument(fmt::format(
"Source with id='{}' not found", source_id)));
518 await_on.emplace_back(&source);
524 auto condition = [sources = await_on]() {
525 return std::all_of(sources.begin(), sources.end(), [](
auto var) {
528 return v->GetState() == State::Aborted || v->GetState() == State::Stopped;
536 return boost::make_ready_future<State>(GetState());
542 auto promise = std::make_shared<boost::promise<State>>();
544 if (timeout > std::chrono::milliseconds(0)) {
545 auto timer = std::make_unique<boost::asio::steady_timer>(GetIoCtx(), timeout);
546 timer->async_wait([promise,
547 timer_ptr = timer.get(),
548 daq_weak = std::weak_ptr<OcmDaqController>(
549 std::static_pointer_cast<OcmDaqController>(shared_from_this()))](
550 boost::system::error_code ec) {
553 auto daq = daq_weak.lock();
558 fmt::format(
"{}: AsyncWait: Operation abandoned before completing.",
562 promise->set_exception(DaqOperationAborted(
""));
569 fmt::format(
"{}: AsyncWait: Operation timed out before completing.",
573 daq->m_timers.begin(),
575 [timer_ptr](std::unique_ptr<boost::asio::steady_timer>& val) {
576 return val.get() == timer_ptr;
578 daq->m_timers.end());
581 promise->set_exception(DaqOperationTimeout(
""));
586 m_timers.emplace_back(std::move(timer));
589 auto listener = [condition,
591 daq_weak = std::weak_ptr<OcmDaqController>(
592 std::static_pointer_cast<OcmDaqController>(shared_from_this()))](
State,
594 auto daq = daq_weak.lock();
596 LOG4CPLUS_WARN(
"daq",
"OcmDaqController deleted before await condition was fulfulled");
602 LOG4CPLUS_INFO(
daq->m_logger,
603 fmt::format(
"{}: AwaitAsync: Await condition fulfilled", *
daq));
605 promise->set_value(
daq->GetState());
614 for (
auto& source : await_on) {
616 [
daq = std::static_pointer_cast<OcmDaqController>(shared_from_this()),
620 fmt::format(
"{}: AsyncWait: Attaching listener on source '{}'.", *
daq, *s));
622 using SlotType =
typename std::remove_reference<
623 decltype(*s.get())>::type::StateSignal::slot_type;
624 s->ConnectStateListener(SlotType(std::move(listener)).track_foreign(
daq));
628 return promise->get_future();
631 std::optional<std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
632 OcmDaqController::FindSource(std::string_view source_id) {
635 std::find_if(m_prim_sources.begin(), m_prim_sources.end(), [source_id](
auto& source) {
636 return source.GetSource().GetName() == source_id;
638 if (it != m_prim_sources.end()) {
639 return gsl::not_null<Source<PrimSource>*>(&(*it));
644 std::find_if(m_meta_sources.begin(), m_meta_sources.end(), [source_id](
auto& source) {
645 return source.GetSource().GetName() == source_id;
647 if (it != m_meta_sources.end()) {
648 return gsl::not_null<Source<MetaSource>*>(&(*it));
656 template <
class SourceType>
657 std::vector<Source<SourceType>> OcmDaqController::MakeSources(std::vector<SourceType> sources) {
659 std::vector<Source<SourceType>> dest;
660 dest.reserve(sources.size());
662 std::make_move_iterator(sources.begin()),
663 std::make_move_iterator(sources.end()),
664 std::back_inserter(dest),
665 [](SourceType&& s) ->
Source<SourceType> { return Source<SourceType>{std::move(s)}; });
669 void OcmDaqController::InitiateAwaitPrimarySources() {
670 AddEvent<ActionEvent>(GetId(),
671 "DaqController::InitiateAwaitPrimarySources(): Initiating",
672 GetStatusRef().GetStatus());
673 if (m_prim_sources.empty()) {
676 fmt::format(
"{}: InitiateAwaitPrimarySources: No primary sources to monitor.", *
this));
679 LOG4CPLUS_DEBUG(m_logger,
680 fmt::format(
"{}: InitiateAwaitPrimarySources: Starting operation.", *
this));
681 auto alerts = std::make_unique<op::AlertState>();
682 auto [future, abort] = m_async_ops.await_prim(MakeAwaitParams(*alerts.get()));
683 m_abort_await_primary_sources = abort;
687 [alerts = std::move(alerts),
688 daq = std::static_pointer_cast<OcmDaqController>(shared_from_this())](
691 auto result = fut.get();
692 LOG4CPLUS_DEBUG(
daq->m_logger,
693 fmt::format(
"{}: InitiateAwaitPrimarySources: Adding {} files from "
696 result.result.size()));
701 daq->EmitContextSignal();
704 if (!std::holds_alternative<Acquiring>(
daq->m_state)) {
708 "{}: InitiateAwaitPrimarySources: "
709 "AwaitAsync completed but another operation has already transitioned "
710 "DAQ from Acquiring so automatic stop will not be performed.",
716 "DaqController::InitiateAwaitPrimarySources(): "
717 "Primary sources completed. Performing automatic stop of metadata sources",
718 daq->GetStatusRef().GetStatus());
729 daq->StopAsync(ErrorPolicy::Strict);
733 DpmDaqController::DpmDaqController(boost::asio::io_context& io_context,
735 std::shared_ptr<ObservableStatus> status,
736 std::shared_ptr<ObservableEventLog> event_log,
737 std::shared_ptr<DpmClient> dpm_client)
738 :
CommonDaqController(io_context, std::move(context), std::move(status), std::move(event_log))
739 , m_liveness(std::make_shared<bool>(true))
740 , m_dpm_client(std::move(dpm_client))
741 , m_logger(log4cplus::Logger::getInstance(
"daq.ocm")) {
742 assert(m_dpm_client);
743 UpdateStateContext();
749 m_status_connection = m_dpm_client->ConnectStatusSignal(
750 [
id =
GetId(), weak = std::weak_ptr<bool>(m_liveness),
this](
Status const& status) {
751 if (
id != status.
id) {
755 auto lock = weak.lock();
758 LOG4CPLUS_DEBUG(
"daq",
"DpmDaqController is abandoned: " <<
this);
762 LOG4CPLUS_DEBUG(m_logger, *
this <<
": Assigning new DAQ status from DPM: " << status);
764 UpdateStateContext();
769 return boost::make_exceptional_future<State>(
770 std::runtime_error(fmt::format(
"StartAsync() is invalid in state: {}",
GetState())));
774 return boost::make_exceptional_future<Status>(
775 std::runtime_error(fmt::format(
"StopAsync() is invalid in state: {}",
GetState())));
780 return boost::make_exceptional_future<State>(std::runtime_error(
781 fmt::format(
"AwaitAsync() with sources is invalid in state: {}",
GetState())));
785 throw std::runtime_error(fmt::format(
"UpdateKeywords() is invalid in state: {}",
GetState()));
793 LOG4CPLUS_TRACE(m_logger, *
this <<
": DpmDaqController::ScheduleMergeAsync");
798 if (starting_state != State::NotScheduled) {
799 return boost::make_exceptional_future<State>(std::runtime_error(
800 fmt::format(
"ScheduleMergeAsync() is invalid in state: {}",
GetState())));
803 assert(std::holds_alternative<NotScheduled>(m_state_ctx));
804 auto& ctx = std::get<NotScheduled>(m_state_ctx);
805 if (ctx.schedule_reply_pending) {
807 return boost::make_exceptional_future<State>(
808 std::logic_error(
"ScheduleMergeAsync() a request is already in flight"));
812 m_dp_spec =
json.dump(2);
815 return m_dpm_client->ScheduleAsync(*m_dp_spec)
817 [starting_state, weak = weak_from_this(),
this](boost::future<State> f) ->
State {
818 auto shared = weak.lock();
820 auto state = f.get();
829 LOG4CPLUS_INFO(m_logger,
830 fmt::format(
"{}: Scheduled DAQ successfully", *
this));
832 LOG4CPLUS_DEBUG(
"daq",
"DpmDaqController is abandoned: " <<
this);
835 }
catch (elt::mal::TimeoutException
const&) {
838 LOG4CPLUS_INFO(m_logger,
839 fmt::format(
"{}: Schedule DAQ failed with timeout "
840 "(no error flag set for this condition)",
847 LOG4CPLUS_ERROR(m_logger, fmt::format(
"{}: Scheduled DAQ failed", *
this));
855 LOG4CPLUS_TRACE(m_logger, *
this <<
": DpmDaqController::AbortAsync");
859 if (state == State::NotScheduled) {
860 assert(std::holds_alternative<NotScheduled>(m_state_ctx));
861 if (!std::get<NotScheduled>(m_state_ctx).schedule_reply_pending) {
862 SetState(State::Aborted);
863 return boost::make_ready_future<Status>(*
GetStatus());
869 return m_dpm_client->AbortAsync(
GetId()).then(
870 GetIoExecutor(), [policy, weak = weak_from_this(),
this](boost::future<State> f) ->
Status {
871 auto shared = weak.lock();
874 LOG4CPLUS_DEBUG(
"daq",
"DpmDaqController is abandoned: " <<
this);
875 throw boost::enable_current_exception(std::runtime_error(
"Operation aborted"));
878 auto result = f.get();
885 LOG4CPLUS_ERROR(m_logger,
887 <<
": Request to abort DAQ to DPM returned with "
888 "error. ErrorPolicy::Tolerant is used so DAQ is marked "
889 "as aborted anyway");
890 SetState(State::Aborted,
true);
898 void DpmDaqController::UpdateStateContext() {
901 case State::NotScheduled:
902 m_state_ctx = NotScheduled{
false};
905 m_state_ctx = std::monostate();
909 void DpmDaqController::SetState(
State state, std::optional<bool>
error) {
913 UpdateStateContext();
928 auto sources = nlohmann::json::array();
933 sources.push_back(nlohmann::json::object({{
"type",
"fitsKeywords"},
935 {
"keywords", std::move(kws)}})
945 std::optional<nlohmann::json> target;
946 std::optional<std::string> target_source_name;
949 1 == std::count_if(ctx.
results.begin(),
952 return part.origin == source;
954 auto it = std::find_if(ctx.
results.begin(),
957 return part.origin == source;
959 assert(it != ctx.
results.end());
960 if (std::holds_alternative<std::string>(it->info)) {
964 auto const& path = std::get<std::string>(it->info);
966 LOG4CPLUS_DEBUG(logger,
967 fmt::format(
"{}: Heuristics resulted in using the file "
968 "{} from {} as *in-place* merge target.",
971 *target_source_name));
972 target = nlohmann::json::object(
973 {{
"type",
"fitsFile"}, {
"sourceName", it->origin}, {
"origin", path}});
978 if (target_source_name && *target_source_name == r.
origin) {
982 if (std::holds_alternative<fits::KeywordVector>(r.
info)) {
985 sources.push_back(nlohmann::json::object(
986 {{
"type",
"fitsKeywords"}, {
"sourceName", r.
origin}, {
"keywords", std::move(kws)}})
989 }
else if (std::holds_alternative<std::string>(r.
info)) {
990 auto const& path = std::get<std::string>(r.
info);
991 sources.push_back(nlohmann::json::object(
992 {{
"type",
"fitsFile"}, {
"sourceName", r.
origin}, {
"origin", path}})
997 json[
"sources"] = std::move(sources);
1002 json[
"target"][
"source"] = *target;