14 #include <fmt/format.h>
15 #include <fmt/ostream.h>
16 #include <log4cplus/loggingmacros.h>
17 #include <mal/Mal.hpp>
31 std::chrono::system_clock::time_point creation_time) {
32 auto now = std::chrono::system_clock::now();
37 if (full.state == daqif::StateAcquiring) {
40 if (full.state == daqif::StateMerging) {
48 std::chrono::system_clock::time_point* tp) {
49 using time_point = std::chrono::system_clock::time_point;
50 using duration = std::chrono::system_clock::duration;
51 using seconds = std::chrono::seconds;
52 using microseconds = std::chrono::microseconds;
59 struct timeval jitter_tv {
62 if (gettimeofday(&tv,
nullptr) != 0) {
64 throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
69 timeradd(&tv, &jitter_tv, &res);
73 time_t time = tv.tv_sec;
74 if (gmtime_r(&time, &tm_time) ==
nullptr) {
76 throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
79 char time_str[31] = {0};
80 int n = snprintf(&time_str[0], 7,
"%.5s.", instrument_id);
82 strftime(&time_str[n], 20,
"%Y-%m-%dT%H:%M:%S", &tm_time);
84 snprintf(&frac_str[0], 5,
".%.3d",
static_cast<int>(tv.tv_usec / 1000.0));
86 strncpy(&time_str[n + 19], &frac_str[0], 4);
90 std::chrono::duration_cast<duration>(seconds(tv.tv_sec) + microseconds(tv.tv_usec)));
92 return std::string(time_str, n + 23);
98 std::shared_ptr<ObservableEventLog> event_log,
100 log4cplus::Logger
const& logger)
101 : m_alive_token(std::make_shared<bool>())
102 , m_executor(executor)
103 , m_params(std::move(params))
104 , m_workspace(workspace)
105 , m_event_log(std::move(event_log))
106 , m_daq_factory(daq_factory)
112 for (
auto& op : m_abort_funcs) {
115 }
catch (std::exception
const& e) {
116 LOG4CPLUS_WARN(m_logger,
117 fmt::format(
"ManagerImpl::~ManagerImpl: Error when aborting "
126 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Starting");
129 decltype(ids) pruned;
131 for (
auto const&
id : ids) {
133 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Loading state for DAQ " <<
id);
138 if (
IsStale(m_params, status.state, context.creation_time)) {
139 LOG4CPLUS_INFO(m_logger,
140 "RestoreFromWorkspace: DAQ " << status <<
" is stale -> archiving");
147 if (full.state == daqif::StateAcquiring) {
148 LOG4CPLUS_INFO(m_logger,
149 "RestoreFromWorkspace: Restoring OCM phase DAQ: " << status);
151 std::move(context), std::make_shared<ObservableStatus>(status), m_event_log);
153 AddDaq(
daq, Store::No);
155 pruned.push_back(
id);
156 }
else if (full.state == daqif::StateMerging) {
157 LOG4CPLUS_INFO(m_logger,
158 "RestoreFromWorkspace: Restoring DPM phase DAQ: " << status);
160 std::move(context), std::make_shared<ObservableStatus>(status), m_event_log);
162 AddDaq(
daq, Store::No);
164 pruned.push_back(
id);
168 "RestoreFromWorkspace: Skipping loading DAQ in unsupported state: " << status);
172 LOG4CPLUS_ERROR(m_logger,
173 "RestoreFromWorkspace: Loading state for DAQ "
174 <<
id <<
" failed (ignoring): " << r);
179 LOG4CPLUS_ERROR(m_logger,
180 "RestoreFromWorkspace: Failed to archive DAQ " <<
id
184 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Skipping " <<
id);
191 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Successfully completed");
193 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Failed");
194 std::throw_with_nested(std::runtime_error(
"Failed to restore from workspace"));
198 for (
unsigned jitter = 0;; ++jitter) {
200 if (!
HaveDaq(id_candidate, id_candidate)) {
209 std::find_if(m_daq_controllers.begin(), m_daq_controllers.end(), [=](
auto const&
daq) {
212 return daq.id == id ||
213 (!file_id.empty() && file_id == daq.controller->GetContext().file_id);
215 if (it != m_daq_controllers.end()) {
216 LOG4CPLUS_DEBUG(m_logger,
217 "Manager: Found conflicting DAQ: id="
218 <<
id <<
", file_id=" << file_id <<
" with existing: id=" << it->id
219 <<
", file_id=" << it->controller->GetContext().file_id);
225 void ManagerImpl::AddInitialKeywords(
DaqContext& ctx) {
228 kws.emplace_back(std::in_place_type<fits::ValueKeyword>,
"ORIGIN", m_params.
origin);
229 kws.emplace_back(std::in_place_type<fits::ValueKeyword>,
"INSTRUME", m_params.
instrument_id);
240 void ManagerImpl::AddDaq(std::shared_ptr<DaqController>
const&
daq, Store store) {
242 LOG4CPLUS_INFO(m_logger,
"Manager: AddDaq: Attempting to add DAQ " <<
daq->GetId());
243 if (
daq->GetId().empty()) {
244 throw boost::enable_current_exception(std::invalid_argument(
"DaqController has empty id!"));
246 if (
daq->GetContext().file_id.empty()) {
247 throw boost::enable_current_exception(
248 std::invalid_argument(
"DaqController has empty file_id"));
251 throw boost::enable_current_exception(
252 std::invalid_argument(
"DaqController with same id already exists"));
255 if (store == Store::Yes) {
261 m_daq_controllers.emplace_back(
264 daq->GetStatus()->ConnectObserver([alive = std::weak_ptr<bool>(m_alive_token),
265 prev_state =
daq->GetState(),
266 this](ObservableStatus
const& status)
mutable {
267 if (alive.expired()) {
268 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
272 if (!IsFinalState(prev_state)) {
276 fmt::format(
"DAQ transitioned to a final state -> archiving: {} (prev {})",
279 m_workspace.StoreStatus(status);
280 m_executor.submit([alive = alive, id = status.GetId(), this] {
281 if (alive.expired()) {
282 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
289 m_workspace.StoreStatus(status);
297 m_executor.submit([alive = alive, id = status.GetId(), this] {
298 if (alive.expired()) {
299 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
302 MoveToMergePhase(id);
308 this->m_status_signal.Signal(status);
310 prev_state = status.GetState();
313 [alive = std::weak_ptr<bool>(m_alive_token),
this](DaqContext
const& ctx) {
314 if (alive.expired()) {
315 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
318 m_workspace.StoreContext(ctx);
321 if (store == Store::Yes) {
327 m_status_signal.Signal(*
daq->GetStatus());
329 if (
daq->GetState() == State::NotScheduled) {
334 void ManagerImpl::RemoveDaq(std::string_view
id) {
335 auto it = std::find_if(m_daq_controllers.begin(),
336 m_daq_controllers.end(),
337 [
id](
auto const&
daq) { return daq.id == id; });
338 if (it == m_daq_controllers.end()) {
339 throw boost::enable_current_exception(
340 std::invalid_argument(fmt::format(
"Remove DAQ failed - no id found: {}",
id)));
342 m_daq_controllers.erase(it);
345 void ManagerImpl::ArchiveDaq(std::string
const&
id) {
346 LOG4CPLUS_INFO(m_logger, fmt::format(
"Moving DAQ to archive: id={}",
id));
348 m_workspace.ArchiveDaq(
id);
351 LOG4CPLUS_INFO(m_logger, fmt::format(
"Moving DAQ to archive done: id={}",
id));
354 void ManagerImpl::StoreActiveDaqs()
const {
355 LOG4CPLUS_INFO(m_logger,
"StoreActiveDaqs()");
357 std::vector<std::string> daqs;
358 for (Daq
const&
daq : m_daq_controllers) {
359 assert(
daq.controller);
361 daqs.push_back(
daq.id);
364 m_workspace.StoreList(daqs);
367 Status ManagerImpl::GetStatus(std::string_view
id)
const {
368 auto&
daq = FindDaqOrThrow(
id);
369 return daq.GetStatus()->GetStatus();
372 ManagerImpl::Daq::Daq(std::string id_arg,
373 std::shared_ptr<DaqController> controller_arg,
374 boost::signals2::connection conn_status_arg,
375 boost::signals2::connection conn_context_arg) noexcept
376 : id(std::move(id_arg))
377 , controller(std::move(controller_arg))
378 , conn_status(std::move(conn_status_arg))
379 , conn_context(std::move(conn_context_arg)) {
382 DaqController
const* ManagerImpl::FindDaq(std::string_view
id)
const noexcept {
383 return const_cast<ManagerImpl*
>(
this)->FindDaq(
id);
386 DaqController* ManagerImpl::FindDaq(std::string_view
id) noexcept {
387 auto it = std::find_if(m_daq_controllers.begin(),
388 m_daq_controllers.end(),
389 [
id](
auto const&
daq) { return daq.id == id; });
390 if (it != m_daq_controllers.end()) {
391 return it->controller.get();
396 DaqController& ManagerImpl::FindDaqOrThrow(std::string_view
id) {
397 auto daq_ptr = FindDaq(
id);
400 throw boost::enable_current_exception(std::invalid_argument(
401 fmt::format(
"DaqController with id '{}' does not exist", std::string(
id))));
406 DaqController
const& ManagerImpl::FindDaqOrThrow(std::string_view
id)
const {
407 return const_cast<ManagerImpl*
>(
this)->FindDaqOrThrow(
id);
410 void ManagerImpl::MoveToMergePhase(std::string_view
id) {
411 auto*
daq = FindDaq(
id);
413 LOG4CPLUS_WARN(m_logger, fmt::format(
"Daq requested to move does not exist: id={}",
id));
416 LOG4CPLUS_INFO(m_logger, fmt::format(
"Moving DAQ to merge-phase: id={}",
id));
418 auto ctx =
daq->GetContext();
419 auto status =
daq->GetStatus();
420 auto event_log =
daq->GetEventLog();
426 status->SetState(State::NotScheduled);
429 m_daq_factory.MakeDpmPhase(std::move(ctx), std::move(status), std::move(event_log));
433 boost::future<State> ManagerImpl::StartDaqAsync(
DaqContext ctx) {
435 AddInitialKeywords(ctx);
439 auto daq = m_daq_factory.MakeOcmPhase(
441 std::make_shared<ObservableStatus>(std::move(
id), std::move(file_id)),
445 return daq->StartAsync()
447 [&,
daq](boost::future<State> f) -> boost::future<State> {
448 if (f.has_exception()) {
451 return daq->AbortAsync(ErrorPolicy::Tolerant)
453 [f = std::move(f)](boost::future<Status>) mutable -> State {
457 __builtin_unreachable();
460 return boost::make_ready_future<State>(f.get());
465 return boost::make_exceptional_future<State>();
469 boost::future<Status> ManagerImpl::StopDaqAsync(std::string_view
id,
ErrorPolicy policy) {
471 return FindDaqOrThrow(
id).StopAsync(policy);
473 return boost::make_exceptional_future<Status>();
477 boost::future<Status> ManagerImpl::AbortDaqAsync(std::string_view
id,
ErrorPolicy policy) {
479 return FindDaqOrThrow(
id).AbortAsync(policy);
481 return boost::make_exceptional_future<Status>();
485 boost::future<Result<Status>> ManagerImpl::AwaitDaqStateAsync(std::string_view
id,
487 std::chrono::milliseconds timeout) {
489 auto&
daq = FindDaqOrThrow(
id);
490 auto status =
daq.GetStatus();
492 LOG4CPLUS_INFO(m_logger,
493 fmt::format(
"{}: Await condition already fulfilled.", *status));
495 return boost::make_ready_future<Result<Status>>({
false, *status});
498 auto logger = log4cplus::Logger::getInstance(m_logger.getName() +
".awaitstate");
499 auto [fut, abort] = op::InitiateAbortableOperation<op::AwaitStateAsync>(
500 m_executor.get_io_context(), status, state, timeout, logger);
502 auto& ref = m_abort_funcs.emplace_back(std::move(abort));
503 LOG4CPLUS_DEBUG(
"daq.manager",
504 fmt::format(
"op::AwaitStateAsync initiated. id={}", ref.GetId()));
507 [
this,
id = ref.GetId(), alive = std::weak_ptr<bool>(m_alive_token)](
auto res) {
508 LOG4CPLUS_DEBUG(
"daq.manager",
509 fmt::format(
"op::AwaitStateAsync completed. id={}", id));
512 auto is_alive = !alive.expired();
521 return boost::make_exceptional_future<Result<Status>>();
526 return FindDaqOrThrow(
id).UpdateKeywords(keywords);
530 return m_status_signal;
533 std::vector<std::shared_ptr<DaqController const>> ManagerImpl::GetDaqControllers() {
534 std::vector<std::shared_ptr<DaqController const>> controllers;
535 controllers.reserve(m_daq_controllers.size());
537 m_daq_controllers.end(),
538 std::back_inserter(controllers),
539 [](
auto const&
daq) { return daq.controller; });
543 void ManagerImpl::RemoveAbortFunc(uint64_t
id) noexcept {
545 m_abort_funcs.erase(std::remove_if(m_abort_funcs.begin(),
547 [
id](
auto const& obj) { return id == obj.GetId(); }),
548 m_abort_funcs.end());
553 ManagerImpl::OpAbortFunc::OpAbortFunc(OpAbortFunc::Func&& func)
554 : m_id(NextId()), m_func(std::move(func)) {
558 uint64_t ManagerImpl::OpAbortFunc::GetId() const noexcept {
562 bool ManagerImpl::OpAbortFunc::Abort() noexcept {
566 uint64_t ManagerImpl::OpAbortFunc::NextId() {
567 static uint64_t next_id = 0;
571 void ManagerImpl::ScheduleDaqsAsync() {
572 LOG4CPLUS_TRACE(m_logger,
"ScheduleDaqAsync()");
575 m_schedule_retry.reset();
577 for (
auto&
daq : m_daq_controllers) {
581 daq.controller->ScheduleMergeAsync().then(
582 m_executor, [
id =
daq.id,
this](boost::future<State> reply) {
583 if (!reply.has_exception()) {
586 fmt::format(
"ScheduleDaqAsyn: Successfully scheduled DAQ '{}'", id));
592 fmt::format(
"ScheduleDaqAsyn: Failed to schedule DAQ '{}', will retry in 60s",
596 if (m_schedule_retry) {
601 boost::posix_time::seconds(60));
602 m_schedule_retry->async_wait([
this](boost::system::error_code
const&
error) {
Contains declaration for the AwaitStateAsync operation.
Abstract factory for DaqControllers.
virtual auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the DPM phase of the DAQ process.
virtual auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the OCM phase of the DAQ process.
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
bool HaveDaq(std::string_view id, std::string_view file_id={}) const noexcept override
Query existing data acquisition by id and optional file_id.
ManagerImpl(rad::IoExecutor &executor, ManagerParams params, Workspace &workspace, std::shared_ptr< ObservableEventLog > event_log, DaqControllerFactory &daq_factory, log4cplus::Logger const &logger)
std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const override
Creates a new unique identifier based on the instrument id and current time.
Interface to interact with DPM workspace.
virtual auto LoadList() const -> std::vector< std::string >=0
virtual void ArchiveDaq(std::string const &id)=0
Archives specified DAQ without deleting any files, typically by moving files it to a specific locatio...
virtual auto LoadContext(std::string const &id) const -> DaqContext=0
Get file name of the data product specification stored in StoreSpecification()
virtual void StoreList(std::vector< std::string > const &queue) const =0
virtual void StoreStatus(Status const &status) const =0
virtual auto LoadStatus(std::string const &id) const -> Status=0
virtual void StoreContext(DaqContext const &context) const =0
Get file name of the data product specification stored in StoreSpecification()
Adapter object intended to be used in contexts without direct access to the output-stream object.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
daq::Workspace interface and implementation declaration
Contains support functions for daqif.
Contains declarations for the helper functions to initiate operations.
Declaration of daq::Manager
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
void UpdateKeywords(KeywordVector &to, KeywordVector const &from, ConflictPolicy policy=ConflictPolicy::Replace)
Updates to with keywords from from.
@ Skip
Skip keyword that conflicts.
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
bool IsStale(ManagerParams const ¶ms, State state, std::chrono::system_clock::time_point creation_time)
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
bool IsSubsequentState(State state1, State state2) noexcept
Compares states and returns whether state1 occurs after state2.
ErrorPolicy
Error policy supported by certain operations.
std::string instrument_id
Instrument identifier.
State
Observable states of the data acquisition process.
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Stopped
All data sources have reported they have stopped acquiring data.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Configurations parameters directly related to manager.
Contains declaration for for DaqController.
Contains declaration for Status and ObservableStatus.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
std::vector< daq::fits::KeywordVariant > keywords
Keyword list provided by OCM to Data Product.
std::string file_id
Data Product FileId as specified by OLAS ICD.
std::string id
DAQ identfier, possibly provided by user.
Non observable status object that keeps stores status of data acquisition.