14 #include <fmt/format.h>
15 #include <fmt/ostream.h>
16 #include <log4cplus/loggingmacros.h>
17 #include <mal/Mal.hpp>
31 std::chrono::system_clock::time_point creation_time) {
32 auto now = std::chrono::system_clock::now();
37 if (full.state == daqif::StateAcquiring) {
40 if (full.state == daqif::StateMerging) {
48 std::chrono::system_clock::time_point* tp) {
49 using time_point = std::chrono::system_clock::time_point;
50 using duration = std::chrono::system_clock::duration;
51 using seconds = std::chrono::seconds;
52 using microseconds = std::chrono::microseconds;
59 struct timeval jitter_tv {
62 if (gettimeofday(&tv,
nullptr) != 0) {
64 throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
69 timeradd(&tv, &jitter_tv, &res);
73 time_t time = tv.tv_sec;
74 if (gmtime_r(&time, &tm_time) ==
nullptr) {
76 throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
79 char time_str[31] = {0};
80 int n = snprintf(&time_str[0], 7,
"%.5s.", instrument_id);
82 strftime(&time_str[n], 20,
"%Y-%m-%dT%H:%M:%S", &tm_time);
84 snprintf(&frac_str[0], 5,
".%.3d",
static_cast<int>(tv.tv_usec / 1000.0));
86 strncpy(&time_str[n + 19], &frac_str[0], 4);
90 std::chrono::duration_cast<duration>(seconds(tv.tv_sec) + microseconds(tv.tv_usec)));
92 return std::string(time_str, n + 23);
98 std::shared_ptr<ObservableEventLog> event_log,
100 : m_alive_token(std::make_shared<bool>())
101 , m_executor(executor)
102 , m_params(std::move(params))
103 , m_workspace(workspace)
104 , m_event_log(std::move(event_log))
105 , m_daq_factory(daq_factory)
106 , m_logger(log4cplus::Logger::getInstance(
"daq.manager")) {
111 for (
auto& op : m_abort_funcs) {
114 }
catch (std::exception
const& e) {
115 LOG4CPLUS_WARN(m_logger,
116 fmt::format(
"ManagerImpl::~ManagerImpl: Error when aborting "
125 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Starting");
128 decltype(ids) pruned;
130 for (
auto const&
id : ids) {
132 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Loading state for DAQ " <<
id);
137 if (
IsStale(m_params, status.state, context.creation_time)) {
138 LOG4CPLUS_INFO(m_logger,
139 "RestoreFromWorkspace: DAQ " << status <<
" is stale -> archiving");
146 if (full.state == daqif::StateAcquiring) {
147 LOG4CPLUS_INFO(m_logger,
148 "RestoreFromWorkspace: Restoring OCM phase DAQ: " << status);
150 std::move(context), std::make_shared<ObservableStatus>(status), m_event_log);
152 AddDaq(
daq, Store::No);
154 pruned.push_back(
id);
155 }
else if (full.state == daqif::StateMerging) {
156 LOG4CPLUS_INFO(m_logger,
157 "RestoreFromWorkspace: Restoring DPM phase DAQ: " << status);
159 std::move(context), std::make_shared<ObservableStatus>(status), m_event_log);
161 AddDaq(
daq, Store::No);
163 pruned.push_back(
id);
167 "RestoreFromWorkspace: Skipping loading DAQ in unsupported state: " << status);
171 LOG4CPLUS_ERROR(m_logger,
172 "RestoreFromWorkspace: Loading state for DAQ "
173 <<
id <<
" failed (ignoring): " << r);
178 LOG4CPLUS_ERROR(m_logger,
179 "RestoreFromWorkspace: Failed to archive DAQ " <<
id
183 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Skipping " <<
id);
190 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Successfully completed");
192 LOG4CPLUS_INFO(m_logger,
"RestoreFromWorkspace: Failed");
193 std::throw_with_nested(std::runtime_error(
"Failed to restore from workspace"));
197 for (
unsigned jitter = 0;; ++jitter) {
199 if (!
HaveDaq(id_candidate, id_candidate)) {
208 std::find_if(m_daq_controllers.begin(), m_daq_controllers.end(), [=](
auto const&
daq) {
211 return daq.id == id ||
212 (!file_id.empty() && file_id == daq.controller->GetContext().file_id);
214 if (it != m_daq_controllers.end()) {
215 LOG4CPLUS_DEBUG(m_logger,
216 "Manager: Found conflicting DAQ: id="
217 <<
id <<
", file_id=" << file_id <<
" with existing: id=" << it->id
218 <<
", file_id=" << it->controller->GetContext().file_id);
224 void ManagerImpl::AddInitialKeywords(
DaqContext& ctx) {
227 kws.emplace_back(std::in_place_type<fits::ValueKeyword>,
"ORIGIN", m_params.
origin);
228 kws.emplace_back(std::in_place_type<fits::ValueKeyword>,
"INSTRUME", m_params.
instrument_id);
239 void ManagerImpl::AddDaq(std::shared_ptr<DaqController>
daq, Store store) {
241 LOG4CPLUS_INFO(m_logger,
"Manager: AddDaq: Attempting to add DAQ " <<
daq->GetId());
242 if (
daq->GetId().empty()) {
243 throw boost::enable_current_exception(std::invalid_argument(
"DaqController has empty id!"));
245 if (
daq->GetContext().file_id.empty()) {
246 throw boost::enable_current_exception(
247 std::invalid_argument(
"DaqController has empty file_id"));
250 throw boost::enable_current_exception(
251 std::invalid_argument(
"DaqController with same id already exists"));
254 if (store == Store::Yes) {
260 m_daq_controllers.emplace_back(
263 daq->GetStatus()->ConnectObserver([alive = std::weak_ptr<bool>(m_alive_token),
264 prev_state =
daq->GetState(),
265 this](ObservableStatus
const& status)
mutable {
266 if (alive.expired()) {
267 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
271 if (!IsFinalState(prev_state)) {
275 fmt::format(
"DAQ transitioned to a final state -> archiving: {} (prev {})",
278 m_workspace.StoreStatus(status);
279 m_executor.submit([alive = alive, id = status.GetId(), this] {
280 if (alive.expired()) {
281 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
288 m_workspace.StoreStatus(status);
291 if (prev_state != State::Stopped && status.GetState() == State::Stopped) {
296 m_executor.submit([alive = alive, id = status.GetId(), this] {
297 if (alive.expired()) {
298 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
301 MoveToMergePhase(id);
307 this->m_status_signal.Signal(status);
309 prev_state = status.GetState();
312 [alive = std::weak_ptr<bool>(m_alive_token),
this](DaqContext
const& ctx) {
313 if (alive.expired()) {
314 LOG4CPLUS_INFO(
"daq",
"Manager has expired");
317 m_workspace.StoreContext(ctx);
320 if (store == Store::Yes) {
326 m_status_signal.Signal(*
daq->GetStatus());
328 if (
daq->GetState() == State::NotScheduled) {
333 void ManagerImpl::RemoveDaq(std::string_view
id) {
334 auto it = std::find_if(m_daq_controllers.begin(),
335 m_daq_controllers.end(),
336 [
id](
auto const&
daq) { return daq.id == id; });
337 if (it == m_daq_controllers.end()) {
338 throw boost::enable_current_exception(
339 std::invalid_argument(fmt::format(
"Remove DAQ failed - no id found: {}",
id)));
341 m_daq_controllers.erase(it);
344 void ManagerImpl::ArchiveDaq(std::string
const&
id) {
345 LOG4CPLUS_INFO(m_logger, fmt::format(
"Moving DAQ to archive: id={}",
id));
347 m_workspace.ArchiveDaq(
id);
350 LOG4CPLUS_INFO(m_logger, fmt::format(
"Moving DAQ to archive done: id={}",
id));
353 void ManagerImpl::StoreActiveDaqs()
const {
354 LOG4CPLUS_INFO(m_logger,
"StoreActiveDaqs()");
356 std::vector<std::string> daqs;
357 for (Daq
const&
daq : m_daq_controllers) {
358 assert(
daq.controller);
360 daqs.push_back(
daq.id);
363 m_workspace.StoreList(daqs);
366 Status ManagerImpl::GetStatus(std::string_view
id)
const {
367 auto&
daq = FindDaqOrThrow(
id);
368 return daq.GetStatus()->GetStatus();
371 ManagerImpl::Daq::Daq(std::string id_arg,
372 std::shared_ptr<DaqController> controller_arg,
373 boost::signals2::connection conn_status_arg,
374 boost::signals2::connection conn_context_arg) noexcept
375 : id(std::move(id_arg))
376 , controller(std::move(controller_arg))
377 , conn_status(std::move(conn_status_arg))
378 , conn_context(std::move(conn_context_arg)) {
381 DaqController
const* ManagerImpl::FindDaq(std::string_view
id)
const noexcept {
382 return const_cast<ManagerImpl*
>(
this)->FindDaq(
id);
385 DaqController* ManagerImpl::FindDaq(std::string_view
id) noexcept {
386 auto it = std::find_if(m_daq_controllers.begin(),
387 m_daq_controllers.end(),
388 [
id](
auto const&
daq) { return daq.id == id; });
389 if (it != m_daq_controllers.end()) {
390 return it->controller.get();
395 DaqController& ManagerImpl::FindDaqOrThrow(std::string_view
id) {
396 auto daq_ptr = FindDaq(
id);
399 throw boost::enable_current_exception(std::invalid_argument(
400 fmt::format(
"DaqController with id '{}' does not exist", std::string(
id))));
405 DaqController
const& ManagerImpl::FindDaqOrThrow(std::string_view
id)
const {
406 return const_cast<ManagerImpl*
>(
this)->FindDaqOrThrow(
id);
409 void ManagerImpl::MoveToMergePhase(std::string_view
id) {
410 auto*
daq = FindDaq(
id);
412 LOG4CPLUS_WARN(m_logger, fmt::format(
"Daq requested to move does not exist: id={}",
id));
415 LOG4CPLUS_INFO(m_logger, fmt::format(
"Moving DAQ to merge-phase: id={}",
id));
417 auto ctx =
daq->GetContext();
418 auto status =
daq->GetStatus();
419 auto event_log =
daq->GetEventLog();
425 status->SetState(State::NotScheduled);
428 m_daq_factory.MakeDpmPhase(std::move(ctx), std::move(status), std::move(event_log));
432 boost::future<State> ManagerImpl::StartDaqAsync(
DaqContext ctx) {
434 AddInitialKeywords(ctx);
436 auto daq = m_daq_factory.MakeOcmPhase(
437 std::move(ctx), std::make_shared<ObservableStatus>(ctx.
id, ctx.
file_id), m_event_log);
440 return daq->StartAsync()
442 [&,
daq](boost::future<State> f) -> boost::future<State> {
443 if (f.has_exception()) {
446 return daq->AbortAsync(ErrorPolicy::Tolerant)
448 [f = std::move(f)](boost::future<Status>) mutable -> State {
452 __builtin_unreachable();
455 return boost::make_ready_future<State>(f.get());
460 return boost::make_exceptional_future<State>();
464 boost::future<Status> ManagerImpl::StopDaqAsync(std::string_view
id,
ErrorPolicy policy) {
466 return FindDaqOrThrow(
id).StopAsync(policy);
468 return boost::make_exceptional_future<Status>();
472 boost::future<Status> ManagerImpl::AbortDaqAsync(std::string_view
id,
ErrorPolicy policy) {
474 return FindDaqOrThrow(
id).AbortAsync(policy);
476 return boost::make_exceptional_future<Status>();
480 boost::future<Result<Status>> ManagerImpl::AwaitDaqStateAsync(std::string_view
id,
482 std::chrono::milliseconds timeout) {
484 auto&
daq = FindDaqOrThrow(
id);
485 auto status =
daq.GetStatus();
487 LOG4CPLUS_INFO(m_logger,
488 fmt::format(
"{}: Await condition already fulfilled.", *status));
490 return boost::make_ready_future<Result<Status>>({
false, *status});
492 auto [fut, abort] = op::InitiateAbortableOperation<op::AwaitStateAsync>(
493 m_executor.get_io_context(), status, state, timeout);
495 auto& ref = m_abort_funcs.emplace_back(std::move(abort));
496 LOG4CPLUS_DEBUG(
"daq.manager",
497 fmt::format(
"op::AwaitStateAsync initiated. id={}", ref.GetId()));
500 [
this,
id = ref.GetId(), alive = std::weak_ptr<bool>(m_alive_token)](
auto res) {
501 LOG4CPLUS_DEBUG(
"daq.manager",
502 fmt::format(
"op::AwaitStateAsync completed. id={}", id));
505 auto is_alive = !alive.expired();
514 return boost::make_exceptional_future<Result<Status>>();
519 return FindDaqOrThrow(
id).UpdateKeywords(keywords);
523 return m_status_signal;
526 std::vector<std::shared_ptr<DaqController const>> ManagerImpl::GetDaqControllers() {
527 std::vector<std::shared_ptr<DaqController const>> controllers;
528 controllers.reserve(m_daq_controllers.size());
529 std::transform(m_daq_controllers.begin(),
530 m_daq_controllers.end(),
531 std::back_inserter(controllers),
532 [](
auto const&
daq) { return daq.controller; });
536 void ManagerImpl::RemoveAbortFunc(uint64_t
id) noexcept {
538 m_abort_funcs.erase(std::remove_if(m_abort_funcs.begin(),
540 [
id](
auto const& obj) { return id == obj.GetId(); }),
541 m_abort_funcs.end());
546 ManagerImpl::OpAbortFunc::OpAbortFunc(OpAbortFunc::Func&& func)
547 : m_id(NextId()), m_func(std::move(func)) {
551 uint64_t ManagerImpl::OpAbortFunc::GetId() const noexcept {
555 bool ManagerImpl::OpAbortFunc::Abort() noexcept {
559 uint64_t ManagerImpl::OpAbortFunc::NextId() {
560 static uint64_t next_id = 0;
564 void ManagerImpl::ScheduleDaqsAsync() {
565 LOG4CPLUS_TRACE(m_logger,
"ScheduleDaqAsync()");
568 m_schedule_retry.reset();
570 for (
auto&
daq : m_daq_controllers) {
571 if (
daq.controller->GetState() != State::NotScheduled) {
574 daq.controller->ScheduleMergeAsync().then(
575 m_executor, [
id =
daq.id,
this](boost::future<State> reply) {
576 if (!reply.has_exception()) {
579 fmt::format(
"ScheduleDaqAsyn: Successfully scheduled DAQ '{}'", id));
585 fmt::format(
"ScheduleDaqAsyn: Failed to schedule DAQ '{}', will retry in 60s",
589 if (m_schedule_retry) {
594 boost::posix_time::seconds(60));
595 m_schedule_retry->async_wait([
this](boost::system::error_code
const&
error) {