8 #ifndef OCF_DAQ_DAQ_CONTROLLER_HPP_
9 #define OCF_DAQ_DAQ_CONTROLLER_HPP_
19 #include <Metadaqif.hpp>
20 #include <boost/asio/io_context.hpp>
21 #include <boost/asio/steady_timer.hpp>
22 #include <boost/thread/future.hpp>
23 #include <gsl/pointers>
24 #include <log4cplus/logger.h>
25 #include <nlohmann/json.hpp>
52 using AwaitReturnType = std::pair<boost::future<Result<DpParts>>, std::function<bool()>>;
65 std::function<boost::future<
void>(op::AsyncOpParams)>
start;
83 std::shared_ptr<ObservableStatus> status,
84 std::shared_ptr<ObservableEventLog> event_log)
85 -> std::shared_ptr<DaqController> = 0;
90 std::shared_ptr<ObservableStatus> status,
91 std::shared_ptr<ObservableEventLog> event_log)
92 -> std::shared_ptr<DaqController> = 0;
101 elt::mal::Mal& m_mal,
102 std::shared_ptr<DpmClient> dpm_client);
104 std::shared_ptr<ObservableStatus> status,
105 std::shared_ptr<ObservableEventLog> event_log)
106 -> std::shared_ptr<DaqController>
override;
108 std::shared_ptr<ObservableStatus> status,
109 std::shared_ptr<ObservableEventLog> event_log)
110 -> std::shared_ptr<DaqController>
override;
113 boost::asio::io_context& m_io_ctx;
114 elt::mal::Mal& m_mal;
116 std::shared_ptr<DpmClient> m_dpm_client;
285 virtual boost::future<State>
286 AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout) = 0;
324 virtual boost::signals2::connection ConnectContext(
ContextSignal::slot_type const& slot) = 0;
337 std::shared_ptr<ObservableStatus> status,
338 std::shared_ptr<ObservableEventLog> event_log);
340 std::shared_ptr<ObservableStatus> GetStatus()
DAQ_NOEXCEPT override;
341 std::shared_ptr<ObservableStatus const> GetStatus()
const DAQ_NOEXCEPT override;
342 std::shared_ptr<ObservableEventLog> GetEventLog()
DAQ_NOEXCEPT override;
346 boost::signals2::connection ConnectContext(ContextSignal::slot_type
const& slot)
override;
349 template <
class T,
class... Args>
351 m_event_log->EmplaceEvent<T>(std::forward<Args>(args)...);
363 return *m_event_log.get();
366 return *m_status.get();
369 return *m_status.get();
372 m_sig_context(m_context);
376 boost::asio::io_context& m_io_ctx;
379 ContextSignal m_sig_context;
380 std::shared_ptr<ObservableStatus> m_status;
381 std::shared_ptr<ObservableEventLog> m_event_log;
407 static std::shared_ptr<OcmDaqController> Create(boost::asio::io_context& io_context,
410 std::shared_ptr<ObservableStatus> status,
411 std::shared_ptr<ObservableEventLog> event_log,
414 boost::future<State> StartAsync()
override;
415 boost::future<Status> StopAsync(
ErrorPolicy policy)
override;
416 boost::future<Status> AbortAsync(
ErrorPolicy policy)
override;
417 boost::future<State> ScheduleMergeAsync()
override;
420 AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout)
override;
427 constexpr log4cplus::Logger
const&
GetLogger()
const noexcept;
439 std::variant<NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted>;
449 std::shared_ptr<ObservableStatus> status,
450 std::shared_ptr<ObservableEventLog> event_log,
472 void InitiateAwaitPrimarySources();
474 void AddInitialKeywords();
476 void SetErrorFlag(
bool error) noexcept;
480 std::variant<gsl::not_null<Source<PrimSource>*>, gsl::not_null<Source<MetaSource>*>>>
481 FindSource(std::string_view source_id);
484 template <
class SourceType>
485 std::vector<Source<SourceType>> MakeSources(std::vector<SourceType> sources);
494 std::vector<std::unique_ptr<boost::asio::steady_timer>>
m_timers;
536 std::shared_ptr<ObservableStatus> status,
537 std::shared_ptr<ObservableEventLog> event_log,
538 std::shared_ptr<DpmClient> dpm_client);
540 boost::future<State> ScheduleMergeAsync()
override;
545 boost::future<State> StartAsync()
override;
549 boost::future<Status> StopAsync(
ErrorPolicy policy)
override;
561 AwaitAsync(std::vector<std::string> sources, std::chrono::milliseconds timeout)
override;
573 boost::future<Status> AbortAsync(
ErrorPolicy policy)
override;
585 bool schedule_reply_pending;
591 void UpdateStateContext();
592 void SetState(
State state, std::optional<bool>
error = std::nullopt);
594 using StateVariant = std::variant<std::monostate, NotScheduled>;
599 std::shared_ptr<bool> m_liveness;
600 std::shared_ptr<DpmClient> m_dpm_client;
604 boost::signals2::scoped_connection m_status_connection;
608 std::optional<std::string> m_dp_spec;
609 StateVariant m_state_ctx;
610 log4cplus::Logger m_logger;
Implements common behaviour of OcmDaqController and DpmDaqController.
ObservableEventLog & GetEventLogRef() noexcept
void AddEvent(Args &&... args)
ObservableStatus & GetStatusRef() noexcept
boost::asio::io_context & GetIoCtx() noexcept
ObservableStatus const & GetStatusRef() const noexcept
DaqContext & GetContextMut() noexcept
rad::IoExecutor & GetIoExecutor() noexcept
Default factory producing "real" implementations.
Abstract factory for DaqControllers.
virtual auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the DPM phase of the DAQ process.
virtual auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the OCM phase of the DAQ process.
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
virtual boost::future< Status > StopAsync(ErrorPolicy policy)=0
Stops the data acquisition.
virtual boost::future< Status > AbortAsync(ErrorPolicy policy)=0
Aborts the data acquisition.
virtual boost::future< State > AwaitAsync(std::vector< std::string > sources, std::chrono::milliseconds timeout)=0
Awaits that data acquisition stops or aborts.
virtual boost::future< State > ScheduleMergeAsync()=0
Schedules DAQ for merging by sending request to DPM.
virtual void UpdateKeywords(fits::KeywordVector const &keywords)=0
Updates (replace or add) list of keywords.
virtual ~DaqController()=default
virtual boost::future< State > StartAsync()=0
Starts the data acquisition.
boost::signals2::signal< void(DaqContext const &)> ContextSignal
virtual State GetState() const DAQ_NOEXCEPT=0
Data acquisition sources.
Implements behaviour from the state NotScheduled to Completed.
Stores data acquisition status and allows subscription to status changes.
Stores data acquisition status and allows subscription to status changes.
Implements daq::DaqController for states responsible to be executed by OCM.
std::vector< Source< PrimSource > > m_prim_sources
Note: Consider vector immutable!
log4cplus::Logger m_logger
std::vector< std::unique_ptr< boost::asio::steady_timer > > m_timers
std::shared_ptr< PendingReplies > m_pending_replies
std::variant< NotStarted, Starting, Acquiring, Stopping, Stopped, Aborting, Aborted > StateVariant
OcmAsyncOperations m_async_ops
std::vector< Source< MetaSource > > m_meta_sources
Note: Consider vector immutable!
std::function< bool()> m_abort_await_primary_sources
If DaqController is awaiting the completion of primary data sources this function will hold the abort...
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Contains declaration of daq::Context.
Contains declaration for DpPart.
Contains error related declarations for DAQ.
Contains declaration for EventLog, ObservableEventLog and related events.
Contains declarations for the helper functions to initiate operations.
Declares daq::State and related functions.
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
std::vector< DpPart > DpParts
ErrorPolicy
Error policy supported by certain operations.
State
Observable states of the data acquisition process.
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
Utility class that represents a result and an error.
log4cplus::Logger & GetLogger()
Config class header file.
Contains declaration for classes related to pending replies.
Declarations for daq::Source and related classes.
Contains declaration for Status and ObservableStatus.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
std::function< boost::future< Result< void > >ErrorPolicy, op::AsyncOpParams)> abort
OcmAsyncOperations & operator=(OcmAsyncOperations const &)=default
bool IsValid() const noexcept
OcmAsyncOperations & operator=(OcmAsyncOperations &&)=default
std::function< boost::future< void >op::AsyncOpParams)> start
std::function< AwaitReturnType(op::AwaitOpParams)> await_prim
OcmAsyncOperations()
Default constructs object with standard async operations.
std::function< boost::future< Result< DpParts > >ErrorPolicy, op::AsyncOpParams)> stop
OcmAsyncOperations(OcmAsyncOperations &&)=default
std::pair< boost::future< Result< DpParts > >, std::function< bool()> > AwaitReturnType
OcmAsyncOperations(OcmAsyncOperations const &)=default
Parameters required for each async operation.
Await specific parameters that is not provided with AsyncOpParams.