8 #ifndef OCM_DAQ_MANAGER_HPP_
9 #define OCM_DAQ_MANAGER_HPP_
13 #include <string_view>
16 #include <boost/asio/deadline_timer.hpp>
17 #include <boost/thread/future.hpp>
18 #include <log4cplus/logger.h>
68 std::chrono::system_clock::time_point* out =
nullptr);
81 template <
class Observer>
83 return m_signal.connect(std::move(o));
153 virtual std::string
MakeDaqId(std::chrono::system_clock::time_point* time =
nullptr)
const = 0;
222 virtual boost::future<Result<Status>>
261 std::shared_ptr<ObservableEventLog> event_log,
263 log4cplus::Logger
const& logger);
270 std::string
MakeDaqId(std::chrono::system_clock::time_point* time =
nullptr)
const override;
272 bool HaveDaq(std::string_view
id, std::string_view file_id = {})
const noexcept
override;
282 boost::future<Result<Status>>
293 Daq(std::string id_arg,
294 std::shared_ptr<DaqController> controller_arg,
295 boost::signals2::connection conn_status_arg,
296 boost::signals2::connection conn_context_arg) noexcept;
299 std::shared_ptr<DaqController> controller;
301 boost::signals2::scoped_connection conn_status;
302 boost::signals2::scoped_connection conn_context;
308 using Func = std::function<bool()>;
309 OpAbortFunc(Func&& f);
310 OpAbortFunc(OpAbortFunc&&) =
default;
311 OpAbortFunc& operator=(OpAbortFunc&&) =
default;
313 uint64_t GetId()
const noexcept;
314 bool Abort() noexcept;
317 static uint64_t NextId();
319 std::function<bool()> m_func;
322 enum class Store { Yes, No };
339 void AddDaq(std::shared_ptr<DaqController>
const&
daq, Store store = Store::Yes);
347 void RemoveDaq(std::string_view
id);
348 void ArchiveDaq(std::string
const&
id);
352 void StoreActiveDaqs()
const;
355 void RemoveAbortFunc(uint64_t
id) noexcept;
356 DaqController const* FindDaq(std::string_view
id)
const noexcept;
359 DaqController const& FindDaqOrThrow(std::string_view
id)
const;
364 void MoveToMergePhase(std::string_view
id);
372 void ScheduleDaqsAsync();
374 std::shared_ptr<bool> m_alive_token;
378 std::shared_ptr<ObservableEventLog> m_event_log;
383 std::vector<Daq> m_daq_controllers;
386 std::vector<OpAbortFunc> m_abort_funcs;
387 log4cplus::Logger m_logger;
390 std::optional<boost::asio::deadline_timer> m_schedule_retry;
Abstract factory for DaqControllers.
Controls the execution of single data acquisition that ultimately result in a set of FITS keywords an...
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
Status GetStatus(std::string_view id) const override
Get status.
boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State, std::chrono::milliseconds timeout) override
Await DAQ state.
bool HaveDaq(std::string_view id, std::string_view file_id={}) const noexcept override
Query existing data acquisition by id and optional file_id.
ManagerImpl(rad::IoExecutor &executor, ManagerParams params, Workspace &workspace, std::shared_ptr< ObservableEventLog > event_log, DaqControllerFactory &daq_factory, log4cplus::Logger const &logger)
std::vector< std::shared_ptr< DaqController const > > GetDaqControllers() override
std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const override
Creates a new unique identifier based on the instrument id and current time.
void UpdateKeywords(std::string_view id, fits::KeywordVector const &keywords) override
Update FITS keywords for DaqController identified by id.
boost::future< State > StartDaqAsync(DaqContext ctx) override
Start DaqController identified by id.
boost::future< Status > StopDaqAsync(std::string_view id, ErrorPolicy policy) override
Stop DaqController identified by id.
boost::future< Status > AbortDaqAsync(std::string_view id, ErrorPolicy policy) override
Abort DaqController identified by id.
StatusSignal & GetStatusSignal() override
Manager owns DaqController and FitsController (active data acquisitions) instances and multiplexes re...
virtual boost::future< Result< Status > > AwaitDaqStateAsync(std::string_view id, State state, std::chrono::milliseconds timeout)=0
Await DAQ state.
boost::signals2::signal< void(ObservableStatus const &)> Signal
virtual boost::future< State > StartDaqAsync(DaqContext ctx)=0
Start DaqController identified by id.
virtual StatusSignal & GetStatusSignal()=0
virtual boost::future< Status > StopDaqAsync(std::string_view id, ErrorPolicy policy)=0
Stop DaqController identified by id.
virtual Status GetStatus(std::string_view id) const =0
Get status.
virtual void UpdateKeywords(std::string_view id, fits::KeywordVector const &keywords)=0
Update FITS keywords for DaqController identified by id.
virtual std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const =0
Creates a new unique identifier based on the instrument id and current time.
virtual void RestoreFromWorkspace()=0
Restore from state stored in workspace.
virtual bool HaveDaq(std::string_view id, std::string_view file_id={}) const DAQ_NOEXCEPT=0
Query existing data acquisition by id and optional file_id.
virtual boost::future< Status > AbortDaqAsync(std::string_view id, ErrorPolicy policy)=0
Abort DaqController identified by id.
virtual std::vector< std::shared_ptr< DaqController const > > GetDaqControllers()=0
Stores data acquisition status and allows subscription to status changes.
boost::signals2::connection ConnectObserver(Observer o)
boost::signals2::signal< void(ObservableStatus const &)> SignalType
void Signal(ObservableStatus const &status)
Interface to interact with DPM workspace.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Contains declaration of daq::Context.
Contains error related declarations for DAQ.
Contains declaration for EventLog, ObservableEventLog and related events.
Declares daq::State and related functions.
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
ErrorPolicy
Error policy supported by certain operations.
std::string instrument_id
Instrument identifier.
State
Observable states of the data acquisition process.
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Configurations parameters directly related to manager.
Config class header file.
Contains declaration for Status and ObservableStatus.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Non observable status object that keeps stores status of data acquisition.
Declaration of utilities.