ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
manager.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdaq
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Definition of `daq::ManagerImpl` and related utilities.
7  */
8 #include <daq/manager.hpp>
9 
10 #include <algorithm>
11 #include <stdexcept>
12 #include <time.h>
13 
14 #include <fmt/format.h>
15 #include <fmt/ostream.h>
16 #include <log4cplus/loggingmacros.h>
17 #include <mal/Mal.hpp>
18 
19 #include <daq/conversion.hpp>
20 #include <daq/daqController.hpp>
21 #include <daq/error/report.hpp>
22 #include <daq/status.hpp>
23 #include <daq/workspace.hpp>
24 
25 #include <daq/op/awaitState.hpp>
26 #include <daq/op/initiate.hpp>
27 
28 namespace daq {
29 bool IsStale(ManagerParams const& params,
30  State state,
31  std::chrono::system_clock::time_point creation_time) {
32  auto now = std::chrono::system_clock::now();
33  if (IsFinalState(state)) {
34  return true;
35  }
36  auto full = MakeState(state);
37  if (full.state == daqif::StateAcquiring) {
38  return now > creation_time + params.acquiring_stale_age;
39  }
40  if (full.state == daqif::StateMerging) {
41  return now > creation_time + params.merging_stale_age;
42  }
43  return false;
44 }
45 
46 std::string MakeIdCandidate(char const* instrument_id,
47  unsigned jitter,
48  std::chrono::system_clock::time_point* tp) {
49  using time_point = std::chrono::system_clock::time_point;
50  using duration = std::chrono::system_clock::duration;
51  using seconds = std::chrono::seconds;
52  using microseconds = std::chrono::microseconds;
53  // 'KMOS.2017-06-02T16:45:55.701'
54  // olas-id must be <= 5 characters
55  // Format: <olas>-<strfmtime>.<frac>
56  // Size: 1-5 1 20 1 3 = 30
57  // chrono formatting is not provided until C++20
58  struct timeval tv;
59  struct timeval jitter_tv {
60  0, jitter * 1000
61  }; // 1 ms
62  if (gettimeofday(&tv, nullptr) != 0) {
63  // GCOVR_EXCL_START
64  throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
65  // GCOVR_EXCL_STOP
66  }
67  // Add jitter
68  struct timeval res;
69  timeradd(&tv, &jitter_tv, &res);
70  tv = res;
71 
72  struct tm tm_time;
73  time_t time = tv.tv_sec;
74  if (gmtime_r(&time, &tm_time) == nullptr) {
75  // GCOVR_EXCL_START
76  throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
77  // GCOVR_EXCL_STOP
78  }
79  char time_str[31] = {0};
80  int n = snprintf(&time_str[0], 7, "%.5s.", instrument_id);
81  // This part is always 20 characters long
82  strftime(&time_str[n], 20, "%Y-%m-%dT%H:%M:%S", &tm_time);
83  char frac_str[5];
84  snprintf(&frac_str[0], 5, ".%.3d", static_cast<int>(tv.tv_usec / 1000.0));
85  // Append the fractional part
86  strncpy(&time_str[n + 19], &frac_str[0], 4);
87  if (tp != nullptr) {
88  // Store resulting time in out optional out param
89  *tp = time_point(
90  std::chrono::duration_cast<duration>(seconds(tv.tv_sec) + microseconds(tv.tv_usec)));
91  }
92  return std::string(time_str, n + 23);
93 }
94 
96  ManagerParams params,
97  Workspace& workspace,
98  std::shared_ptr<ObservableEventLog> event_log,
99  DaqControllerFactory& daq_factory,
100  log4cplus::Logger const& logger)
101  : m_alive_token(std::make_shared<bool>())
102  , m_executor(executor)
103  , m_params(std::move(params))
104  , m_workspace(workspace)
105  , m_event_log(std::move(event_log))
106  , m_daq_factory(daq_factory)
107  , m_logger(logger) {
108 }
109 
111  // Abort any ongoing operations
112  for (auto& op : m_abort_funcs) {
113  try {
114  op.Abort();
115  } catch (std::exception const& e) {
116  LOG4CPLUS_WARN(m_logger,
117  fmt::format("ManagerImpl::~ManagerImpl: Error when aborting "
118  "operation {}: {}",
119  op.GetId(),
120  e.what()));
121  }
122  }
123 }
124 
126  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Starting");
127  auto ids = m_workspace.LoadList();
128  // New list of pruned DAQs
129  decltype(ids) pruned;
130 
131  for (auto const& id : ids) {
132  try {
133  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Loading state for DAQ " << id);
134  auto context = m_workspace.LoadContext(id);
135  auto status = m_workspace.LoadStatus(id);
136 
137  // Ignore stale DAQs
138  if (IsStale(m_params, status.state, context.creation_time)) {
139  LOG4CPLUS_INFO(m_logger,
140  "RestoreFromWorkspace: DAQ " << status << " is stale -> archiving");
141  m_workspace.ArchiveDaq(id);
142  continue;
143  }
144 
145  auto full = MakeState(status.state);
146  // DAQ should be loaded.
147  if (full.state == daqif::StateAcquiring) {
148  LOG4CPLUS_INFO(m_logger,
149  "RestoreFromWorkspace: Restoring OCM phase DAQ: " << status);
150  auto daq = m_daq_factory.MakeOcmPhase(
151  std::move(context), std::make_shared<ObservableStatus>(status), m_event_log);
152  assert(daq);
153  AddDaq(daq, Store::No);
154  // We keep this
155  pruned.push_back(id);
156  } else if (full.state == daqif::StateMerging) {
157  LOG4CPLUS_INFO(m_logger,
158  "RestoreFromWorkspace: Restoring DPM phase DAQ: " << status);
159  auto daq = m_daq_factory.MakeDpmPhase(
160  std::move(context), std::make_shared<ObservableStatus>(status), m_event_log);
161  assert(daq);
162  AddDaq(daq, Store::No);
163  // We keep this
164  pruned.push_back(id);
165  } else {
166  LOG4CPLUS_INFO(
167  m_logger,
168  "RestoreFromWorkspace: Skipping loading DAQ in unsupported state: " << status);
169  }
170  } catch (...) {
171  error::NestedExceptionReporter r(std::current_exception());
172  LOG4CPLUS_ERROR(m_logger,
173  "RestoreFromWorkspace: Loading state for DAQ "
174  << id << " failed (ignoring): " << r);
175  try {
176  m_workspace.ArchiveDaq(id);
177  } catch (...) {
178  error::NestedExceptionReporter r(std::current_exception());
179  LOG4CPLUS_ERROR(m_logger,
180  "RestoreFromWorkspace: Failed to archive DAQ " << id
181  << "(ignoring): \n"
182  << r);
183  }
184  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Skipping " << id);
185  }
186  }
187 
188  // Write back pruned list of DAQs
189  m_workspace.StoreList(pruned);
190 
191  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Successfully completed");
192 } catch (...) {
193  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Failed");
194  std::throw_with_nested(std::runtime_error("Failed to restore from workspace"));
195 }
196 
197 std::string ManagerImpl::MakeDaqId(std::chrono::system_clock::time_point* time) const {
198  for (unsigned jitter = 0;; ++jitter) {
199  auto id_candidate = daq::MakeIdCandidate(m_params.instrument_id.c_str(), jitter, time);
200  if (!HaveDaq(id_candidate, id_candidate)) {
201  return id_candidate;
202  }
203  }
204 }
205 
206 bool ManagerImpl::HaveDaq(std::string_view id, std::string_view file_id) const noexcept {
207  assert(!id.empty());
208  auto it =
209  std::find_if(m_daq_controllers.begin(), m_daq_controllers.end(), [=](auto const& daq) {
210  // Return true if daq is equal
211  // Return true if file_id is non-empty and equal
212  return daq.id == id ||
213  (!file_id.empty() && file_id == daq.controller->GetContext().file_id);
214  });
215  if (it != m_daq_controllers.end()) {
216  LOG4CPLUS_DEBUG(m_logger,
217  "Manager: Found conflicting DAQ: id="
218  << id << ", file_id=" << file_id << " with existing: id=" << it->id
219  << ", file_id=" << it->controller->GetContext().file_id);
220  return true;
221  }
222  return false;
223 }
224 
225 void ManagerImpl::AddInitialKeywords(DaqContext& ctx) {
226  // note: ARCFILE and ORIGFILE is added by daqDpmMerge
228  kws.emplace_back(std::in_place_type<fits::ValueKeyword>, "ORIGIN", m_params.origin);
229  kws.emplace_back(std::in_place_type<fits::ValueKeyword>, "INSTRUME", m_params.instrument_id);
230  // @todo:
231  // - TELESCOP
232  // - DATE?
233 
234  // Update so that OCM keywords are added first (DaqContext may already contain keywords from
235  // request).
237  ctx.keywords.swap(kws);
238 }
239 
240 void ManagerImpl::AddDaq(std::shared_ptr<DaqController> const& daq, Store store) {
241  assert(daq);
242  LOG4CPLUS_INFO(m_logger, "Manager: AddDaq: Attempting to add DAQ " << daq->GetId());
243  if (daq->GetId().empty()) {
244  throw boost::enable_current_exception(std::invalid_argument("DaqController has empty id!"));
245  }
246  if (daq->GetContext().file_id.empty()) {
247  throw boost::enable_current_exception(
248  std::invalid_argument("DaqController has empty file_id"));
249  }
250  if (HaveDaq(daq->GetId(), daq->GetContext().file_id)) {
251  throw boost::enable_current_exception(
252  std::invalid_argument("DaqController with same id already exists"));
253  }
254 
255  if (store == Store::Yes) {
256  // Requested to store DAQ (i.e. it was not loaded from workspace)
257  m_workspace.StoreContext(daq->GetContext());
258  m_workspace.StoreStatus(*daq->GetStatus());
259  }
260 
261  m_daq_controllers.emplace_back(
262  daq->GetId(),
263  daq,
264  daq->GetStatus()->ConnectObserver([alive = std::weak_ptr<bool>(m_alive_token),
265  prev_state = daq->GetState(),
266  this](ObservableStatus const& status) mutable {
267  if (alive.expired()) {
268  LOG4CPLUS_INFO("daq", "Manager has expired");
269  return;
270  }
271  if (IsFinalState(status.GetState())) {
272  if (!IsFinalState(prev_state)) {
273  // Transition to final state -> Archive DAQ
274  LOG4CPLUS_INFO(
275  m_logger,
276  fmt::format("DAQ transitioned to a final state -> archiving: {} (prev {})",
277  status,
278  prev_state));
279  m_workspace.StoreStatus(status);
280  m_executor.submit([alive = alive, id = status.GetId(), this] {
281  if (alive.expired()) {
282  LOG4CPLUS_INFO("daq", "Manager has expired");
283  return;
284  }
285  ArchiveDaq(id);
286  });
287  }
288  } else {
289  m_workspace.StoreStatus(status);
290 
291  // Handle handover
292  if (prev_state != State::Stopped && status.GetState() == State::Stopped) {
293  // If DAQ is stopped we need to move it to the merging phase
294  // To be safe we defer the execution.
295  // Manager lives as long as executor so we don't have to
296  // check liveness.
297  m_executor.submit([alive = alive, id = status.GetId(), this] {
298  if (alive.expired()) {
299  LOG4CPLUS_INFO("daq", "Manager has expired");
300  return;
301  }
302  MoveToMergePhase(id);
303  });
304  }
305  }
306 
307  // Signal other observers.
308  this->m_status_signal.Signal(status);
309 
310  prev_state = status.GetState();
311  }),
312  daq->ConnectContext(
313  [alive = std::weak_ptr<bool>(m_alive_token), this](DaqContext const& ctx) {
314  if (alive.expired()) {
315  LOG4CPLUS_INFO("daq", "Manager has expired");
316  return;
317  }
318  m_workspace.StoreContext(ctx);
319  }));
320 
321  if (store == Store::Yes) {
322  // Store daq list
323  StoreActiveDaqs();
324  }
325 
326  // Notify observers that DAQ was added.
327  m_status_signal.Signal(*daq->GetStatus());
328 
329  if (daq->GetState() == State::NotScheduled) {
330  ScheduleDaqsAsync();
331  }
332 }
333 
334 void ManagerImpl::RemoveDaq(std::string_view id) {
335  auto it = std::find_if(m_daq_controllers.begin(),
336  m_daq_controllers.end(),
337  [id](auto const& daq) { return daq.id == id; });
338  if (it == m_daq_controllers.end()) {
339  throw boost::enable_current_exception(
340  std::invalid_argument(fmt::format("Remove DAQ failed - no id found: {}", id)));
341  }
342  m_daq_controllers.erase(it);
343 }
344 
345 void ManagerImpl::ArchiveDaq(std::string const& id) {
346  LOG4CPLUS_INFO(m_logger, fmt::format("Moving DAQ to archive: id={}", id));
347  // Archive persistent storage
348  m_workspace.ArchiveDaq(id);
349 
350  StoreActiveDaqs();
351  LOG4CPLUS_INFO(m_logger, fmt::format("Moving DAQ to archive done: id={}", id));
352 }
353 
354 void ManagerImpl::StoreActiveDaqs() const {
355  LOG4CPLUS_INFO(m_logger, "StoreActiveDaqs()");
356  // And remove from
357  std::vector<std::string> daqs;
358  for (Daq const& daq : m_daq_controllers) {
359  assert(daq.controller);
360  if (!IsFinalState(daq.controller->GetState())) {
361  daqs.push_back(daq.id);
362  }
363  }
364  m_workspace.StoreList(daqs);
365 }
366 
367 Status ManagerImpl::GetStatus(std::string_view id) const {
368  auto& daq = FindDaqOrThrow(id);
369  return daq.GetStatus()->GetStatus();
370 }
371 
372 ManagerImpl::Daq::Daq(std::string id_arg,
373  std::shared_ptr<DaqController> controller_arg,
374  boost::signals2::connection conn_status_arg,
375  boost::signals2::connection conn_context_arg) noexcept
376  : id(std::move(id_arg))
377  , controller(std::move(controller_arg))
378  , conn_status(std::move(conn_status_arg))
379  , conn_context(std::move(conn_context_arg)) {
380 }
381 
382 DaqController const* ManagerImpl::FindDaq(std::string_view id) const noexcept {
383  return const_cast<ManagerImpl*>(this)->FindDaq(id);
384 }
385 
386 DaqController* ManagerImpl::FindDaq(std::string_view id) noexcept {
387  auto it = std::find_if(m_daq_controllers.begin(),
388  m_daq_controllers.end(),
389  [id](auto const& daq) { return daq.id == id; });
390  if (it != m_daq_controllers.end()) {
391  return it->controller.get();
392  }
393  return nullptr;
394 }
395 
396 DaqController& ManagerImpl::FindDaqOrThrow(std::string_view id) {
397  auto daq_ptr = FindDaq(id);
398 
399  if (!daq_ptr) {
400  throw boost::enable_current_exception(std::invalid_argument(
401  fmt::format("DaqController with id '{}' does not exist", std::string(id))));
402  }
403  return *daq_ptr;
404 }
405 
406 DaqController const& ManagerImpl::FindDaqOrThrow(std::string_view id) const {
407  return const_cast<ManagerImpl*>(this)->FindDaqOrThrow(id);
408 }
409 
410 void ManagerImpl::MoveToMergePhase(std::string_view id) {
411  auto* daq = FindDaq(id);
412  if (!daq) {
413  LOG4CPLUS_WARN(m_logger, fmt::format("Daq requested to move does not exist: id={}", id));
414  return;
415  }
416  LOG4CPLUS_INFO(m_logger, fmt::format("Moving DAQ to merge-phase: id={}", id));
417  // Copy state we want to keep.
418  auto ctx = daq->GetContext();
419  auto status = daq->GetStatus();
420  auto event_log = daq->GetEventLog();
421  // Delete old DAQ before creating new
422  // note: this invalidates "daq"
423  RemoveDaq(id);
424 
425  // Manually transition to first state in Merging
426  status->SetState(State::NotScheduled);
427 
428  auto new_daq =
429  m_daq_factory.MakeDpmPhase(std::move(ctx), std::move(status), std::move(event_log));
430  AddDaq(new_daq);
431 }
432 
433 boost::future<State> ManagerImpl::StartDaqAsync(DaqContext ctx) {
434  try {
435  AddInitialKeywords(ctx);
436 
437  auto id = ctx.id;
438  auto file_id = ctx.file_id;
439  auto daq = m_daq_factory.MakeOcmPhase(
440  std::move(ctx),
441  std::make_shared<ObservableStatus>(std::move(id), std::move(file_id)),
442  m_event_log);
443  assert(daq);
444  AddDaq(daq);
445  return daq->StartAsync()
446  .then(m_executor,
447  [&, daq](boost::future<State> f) -> boost::future<State> {
448  if (f.has_exception()) {
449  // Any error during start may lead to partially started acquisition that
450  // we need to abort
451  return daq->AbortAsync(ErrorPolicy::Tolerant)
452  .then(m_executor,
453  [f = std::move(f)](boost::future<Status>) mutable -> State {
454  // We ignore errors from AbortAsync as we can't do anything
455  // about it Then we return original error
456  f.get(); // throws
457  __builtin_unreachable();
458  });
459  } else {
460  return boost::make_ready_future<State>(f.get());
461  }
462  })
463  .unwrap();
464  } catch (...) {
465  return boost::make_exceptional_future<State>();
466  }
467 }
468 
469 boost::future<Status> ManagerImpl::StopDaqAsync(std::string_view id, ErrorPolicy policy) {
470  try {
471  return FindDaqOrThrow(id).StopAsync(policy);
472  } catch (...) {
473  return boost::make_exceptional_future<Status>();
474  }
475 }
476 
477 boost::future<Status> ManagerImpl::AbortDaqAsync(std::string_view id, ErrorPolicy policy) {
478  try {
479  return FindDaqOrThrow(id).AbortAsync(policy);
480  } catch (...) {
481  return boost::make_exceptional_future<Status>();
482  }
483 }
484 
485 boost::future<Result<Status>> ManagerImpl::AwaitDaqStateAsync(std::string_view id,
486  State state,
487  std::chrono::milliseconds timeout) {
488  try {
489  auto& daq = FindDaqOrThrow(id);
490  auto status = daq.GetStatus();
491  if (!IsSubsequentState(state, daq.GetState())) {
492  LOG4CPLUS_INFO(m_logger,
493  fmt::format("{}: Await condition already fulfilled.", *status));
494  // Condition already fulfilled.
495  return boost::make_ready_future<Result<Status>>({false, *status});
496  } else {
497  // Create child logger for await state.
498  auto logger = log4cplus::Logger::getInstance(m_logger.getName() + ".awaitstate");
499  auto [fut, abort] = op::InitiateAbortableOperation<op::AwaitStateAsync>(
500  m_executor.get_io_context(), status, state, timeout, logger);
501  // Store abort function so when Manager is deleted it will
502  auto& ref = m_abort_funcs.emplace_back(std::move(abort));
503  LOG4CPLUS_DEBUG("daq.manager",
504  fmt::format("op::AwaitStateAsync initiated. id={}", ref.GetId()));
505  return fut.then(
506  m_executor,
507  [this, id = ref.GetId(), alive = std::weak_ptr<bool>(m_alive_token)](auto res) {
508  LOG4CPLUS_DEBUG("daq.manager",
509  fmt::format("op::AwaitStateAsync completed. id={}", id));
510  // Remove abort function since operation completed, but only if
511  // object is alive.
512  auto is_alive = !alive.expired();
513  if (is_alive) {
514  // Manager is still alive, so we remove abort function
515  RemoveAbortFunc(id);
516  }
517  return res.get();
518  });
519  }
520  } catch (...) {
521  return boost::make_exceptional_future<Result<Status>>();
522  }
523 }
524 
525 void ManagerImpl::UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) {
526  return FindDaqOrThrow(id).UpdateKeywords(keywords);
527 }
528 
529 StatusSignal& ManagerImpl::GetStatusSignal() {
530  return m_status_signal;
531 }
532 
533 std::vector<std::shared_ptr<DaqController const>> ManagerImpl::GetDaqControllers() {
534  std::vector<std::shared_ptr<DaqController const>> controllers;
535  controllers.reserve(m_daq_controllers.size());
536  std::transform(m_daq_controllers.begin(),
537  m_daq_controllers.end(),
538  std::back_inserter(controllers),
539  [](auto const& daq) { return daq.controller; });
540  return controllers;
541 }
542 
543 void ManagerImpl::RemoveAbortFunc(uint64_t id) noexcept {
544  try {
545  m_abort_funcs.erase(std::remove_if(m_abort_funcs.begin(),
546  m_abort_funcs.end(),
547  [id](auto const& obj) { return id == obj.GetId(); }),
548  m_abort_funcs.end());
549  } catch (...) {
550  }
551 }
552 
553 ManagerImpl::OpAbortFunc::OpAbortFunc(OpAbortFunc::Func&& func)
554  : m_id(NextId()), m_func(std::move(func)) {
555  assert(m_func);
556 }
557 
558 uint64_t ManagerImpl::OpAbortFunc::GetId() const noexcept {
559  return m_id;
560 }
561 
562 bool ManagerImpl::OpAbortFunc::Abort() noexcept {
563  return m_func();
564 }
565 
566 uint64_t ManagerImpl::OpAbortFunc::NextId() {
567  static uint64_t next_id = 0;
568  return next_id++;
569 }
570 
571 void ManagerImpl::ScheduleDaqsAsync() {
572  LOG4CPLUS_TRACE(m_logger, "ScheduleDaqAsync()");
573  // Regardless if caller was invoked from timer or manually we reset the deadline timer
574  // so that timer is restarted when ScheduleMergeAsync completes as necessary.
575  m_schedule_retry.reset();
576 
577  for (auto& daq : m_daq_controllers) {
578  if (daq.controller->GetState() != State::NotScheduled) {
579  continue;
580  }
581  daq.controller->ScheduleMergeAsync().then(
582  m_executor, [id = daq.id, this](boost::future<State> reply) {
583  if (!reply.has_exception()) {
584  LOG4CPLUS_INFO(
585  m_logger,
586  fmt::format("ScheduleDaqAsyn: Successfully scheduled DAQ '{}'", id));
587  // Success
588  return;
589  }
590  LOG4CPLUS_WARN(
591  m_logger,
592  fmt::format("ScheduleDaqAsyn: Failed to schedule DAQ '{}', will retry in 60s",
593  id));
594  // Some kind of error happened, at this point we don't care what it is
595  // but simply schedule a new attempt using deadline timer, unless already scheduled.
596  if (m_schedule_retry) {
597  // Already scheduled..
598  return;
599  }
600  m_schedule_retry.emplace(m_executor.get_io_context(),
601  boost::posix_time::seconds(60));
602  m_schedule_retry->async_wait([this](boost::system::error_code const& error) {
603  if (error) {
604  return;
605  }
606  ScheduleDaqsAsync();
607  });
608  });
609  }
610 }
611 
612 } // namespace daq
Contains declaration for the AwaitStateAsync operation.
Abstract factory for DaqControllers.
virtual auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the DPM phase of the DAQ process.
virtual auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the OCM phase of the DAQ process.
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
Definition: manager.cpp:125
bool HaveDaq(std::string_view id, std::string_view file_id={}) const noexcept override
Query existing data acquisition by id and optional file_id.
Definition: manager.cpp:206
ManagerImpl(rad::IoExecutor &executor, ManagerParams params, Workspace &workspace, std::shared_ptr< ObservableEventLog > event_log, DaqControllerFactory &daq_factory, log4cplus::Logger const &logger)
Definition: manager.cpp:95
~ManagerImpl() noexcept
Definition: manager.cpp:110
std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const override
Creates a new unique identifier based on the instrument id and current time.
Definition: manager.cpp:197
Observes any status.
Definition: manager.hpp:77
Interface to interact with DPM workspace.
Definition: workspace.hpp:31
virtual auto LoadList() const -> std::vector< std::string >=0
virtual void ArchiveDaq(std::string const &id)=0
Archives specified DAQ without deleting any files, typically by moving files it to a specific locatio...
virtual auto LoadContext(std::string const &id) const -> DaqContext=0
Get file name of the data product specification stored in StoreSpecification()
virtual void StoreList(std::vector< std::string > const &queue) const =0
virtual void StoreStatus(Status const &status) const =0
virtual auto LoadStatus(std::string const &id) const -> Status=0
virtual void StoreContext(DaqContext const &context) const =0
Get file name of the data product specification stored in StoreSpecification()
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Definition: ioExecutor.hpp:41
daq::Workspace interface and implementation declaration
Contains support functions for daqif.
Contains declarations for the helper functions to initiate operations.
Declaration of daq::Manager
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
void UpdateKeywords(KeywordVector &to, KeywordVector const &from, ConflictPolicy policy=ConflictPolicy::Replace)
Updates to with keywords from from.
Definition: keyword.cpp:554
@ Skip
Skip keyword that conflicts.
std::string origin
Definition: manager.hpp:40
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
Definition: manager.cpp:46
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:63
bool IsStale(ManagerParams const &params, State state, std::chrono::system_clock::time_point creation_time)
Definition: manager.cpp:29
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
Definition: daqContext.cpp:29
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
Definition: manager.hpp:52
bool IsSubsequentState(State state1, State state2) noexcept
Compares states and returns whether state1 occurs after state2.
Definition: state.cpp:26
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
std::string instrument_id
Instrument identifier.
Definition: manager.hpp:39
State
Observable states of the data acquisition process.
Definition: state.hpp:39
@ NotScheduled
Before daq is acknowledged by dpm it remains in NotScheduled.
@ Stopped
All data sources have reported they have stopped acquiring data.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
Definition: state.cpp:15
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Definition: manager.hpp:46
Configurations parameters directly related to manager.
Definition: manager.hpp:35
Definition: main.cpp:23
Contains declaration for for DaqController.
Contains declaration for Status and ObservableStatus.
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
std::vector< daq::fits::KeywordVariant > keywords
Keyword list provided by OCM to Data Product.
Definition: daqContext.hpp:87
std::string file_id
Data Product FileId as specified by OLAS ICD.
Definition: daqContext.hpp:65
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:60
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:124
auto const & transform