ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
manager.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_common_libdaq
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Definition of `daq::ManagerImpl` and related utilities.
7  */
8 #include <daq/manager.hpp>
9 
10 #include <algorithm>
11 #include <stdexcept>
12 #include <time.h>
13 
14 #include <fmt/format.h>
15 #include <fmt/ostream.h>
16 #include <log4cplus/loggingmacros.h>
17 #include <mal/Mal.hpp>
18 
19 #include <daq/conversion.hpp>
20 #include <daq/daqController.hpp>
21 #include <daq/error/report.hpp>
22 #include <daq/status.hpp>
23 #include <daq/workspace.hpp>
24 
25 #include <daq/op/awaitState.hpp>
26 #include <daq/op/initiate.hpp>
27 
28 namespace daq {
29 bool IsStale(ManagerParams const& params,
30  State state,
31  std::chrono::system_clock::time_point creation_time) {
32  auto now = std::chrono::system_clock::now();
33  if (IsFinalState(state)) {
34  return true;
35  }
36  auto full = MakeState(state);
37  if (full.state == daqif::StateAcquiring) {
38  return now > creation_time + params.acquiring_stale_age;
39  }
40  if (full.state == daqif::StateMerging) {
41  return now > creation_time + params.merging_stale_age;
42  }
43  return false;
44 }
45 
46 std::string MakeIdCandidate(char const* instrument_id,
47  unsigned jitter,
48  std::chrono::system_clock::time_point* tp) {
49  using time_point = std::chrono::system_clock::time_point;
50  using duration = std::chrono::system_clock::duration;
51  using seconds = std::chrono::seconds;
52  using microseconds = std::chrono::microseconds;
53  // 'KMOS.2017-06-02T16:45:55.701'
54  // olas-id must be <= 5 characters
55  // Format: <olas>-<strfmtime>.<frac>
56  // Size: 1-5 1 20 1 3 = 30
57  // chrono formatting is not provided until C++20
58  struct timeval tv;
59  struct timeval jitter_tv {
60  0, jitter * 1000
61  }; // 1 ms
62  if (gettimeofday(&tv, nullptr) != 0) {
63  // GCOVR_EXCL_START
64  throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
65  // GCOVR_EXCL_STOP
66  }
67  // Add jitter
68  struct timeval res;
69  timeradd(&tv, &jitter_tv, &res);
70  tv = res;
71 
72  struct tm tm_time;
73  time_t time = tv.tv_sec;
74  if (gmtime_r(&time, &tm_time) == nullptr) {
75  // GCOVR_EXCL_START
76  throw boost::enable_current_exception(std::system_error(errno, std::generic_category()));
77  // GCOVR_EXCL_STOP
78  }
79  char time_str[31] = {0};
80  int n = snprintf(&time_str[0], 7, "%.5s.", instrument_id);
81  // This part is always 20 characters long
82  strftime(&time_str[n], 20, "%Y-%m-%dT%H:%M:%S", &tm_time);
83  char frac_str[5];
84  snprintf(&frac_str[0], 5, ".%.3d", static_cast<int>(tv.tv_usec / 1000.0));
85  // Append the fractional part
86  strncpy(&time_str[n + 19], &frac_str[0], 4);
87  if (tp != nullptr) {
88  // Store resulting time in out optional out param
89  *tp = time_point(
90  std::chrono::duration_cast<duration>(seconds(tv.tv_sec) + microseconds(tv.tv_usec)));
91  }
92  return std::string(time_str, n + 23);
93 }
94 
96  ManagerParams params,
97  Workspace& workspace,
98  std::shared_ptr<ObservableEventLog> event_log,
99  DaqControllerFactory& daq_factory)
100  : m_alive_token(std::make_shared<bool>())
101  , m_executor(executor)
102  , m_params(std::move(params))
103  , m_workspace(workspace)
104  , m_event_log(std::move(event_log))
105  , m_daq_factory(daq_factory)
106  , m_logger(log4cplus::Logger::getInstance("daq.manager")) {
107 }
108 
110  // Abort any ongoing operations
111  for (auto& op : m_abort_funcs) {
112  try {
113  op.Abort();
114  } catch (std::exception const& e) {
115  LOG4CPLUS_WARN(m_logger,
116  fmt::format("ManagerImpl::~ManagerImpl: Error when aborting "
117  "operation {}: {}",
118  op.GetId(),
119  e.what()));
120  }
121  }
122 }
123 
125  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Starting");
126  auto ids = m_workspace.LoadList();
127  // New list of pruned DAQs
128  decltype(ids) pruned;
129 
130  for (auto const& id : ids) {
131  try {
132  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Loading state for DAQ " << id);
133  auto context = m_workspace.LoadContext(id);
134  auto status = m_workspace.LoadStatus(id);
135 
136  // Ignore stale DAQs
137  if (IsStale(m_params, status.state, context.creation_time)) {
138  LOG4CPLUS_INFO(m_logger,
139  "RestoreFromWorkspace: DAQ " << status << " is stale -> archiving");
140  m_workspace.ArchiveDaq(id);
141  continue;
142  }
143 
144  auto full = MakeState(status.state);
145  // DAQ should be loaded.
146  if (full.state == daqif::StateAcquiring) {
147  LOG4CPLUS_INFO(m_logger,
148  "RestoreFromWorkspace: Restoring OCM phase DAQ: " << status);
149  auto daq = m_daq_factory.MakeOcmPhase(
150  std::move(context), std::make_shared<ObservableStatus>(status), m_event_log);
151  assert(daq);
152  AddDaq(daq, Store::No);
153  // We keep this
154  pruned.push_back(id);
155  } else if (full.state == daqif::StateMerging) {
156  LOG4CPLUS_INFO(m_logger,
157  "RestoreFromWorkspace: Restoring DPM phase DAQ: " << status);
158  auto daq = m_daq_factory.MakeDpmPhase(
159  std::move(context), std::make_shared<ObservableStatus>(status), m_event_log);
160  assert(daq);
161  AddDaq(daq, Store::No);
162  // We keep this
163  pruned.push_back(id);
164  } else {
165  LOG4CPLUS_INFO(
166  m_logger,
167  "RestoreFromWorkspace: Skipping loading DAQ in unsupported state: " << status);
168  }
169  } catch (...) {
170  error::NestedExceptionReporter r(std::current_exception());
171  LOG4CPLUS_ERROR(m_logger,
172  "RestoreFromWorkspace: Loading state for DAQ "
173  << id << " failed (ignoring): " << r);
174  try {
175  m_workspace.ArchiveDaq(id);
176  } catch (...) {
177  error::NestedExceptionReporter r(std::current_exception());
178  LOG4CPLUS_ERROR(m_logger,
179  "RestoreFromWorkspace: Failed to archive DAQ " << id
180  << "(ignoring): \n"
181  << r);
182  }
183  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Skipping " << id);
184  }
185  }
186 
187  // Write back pruned list of DAQs
188  m_workspace.StoreList(pruned);
189 
190  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Successfully completed");
191 } catch (...) {
192  LOG4CPLUS_INFO(m_logger, "RestoreFromWorkspace: Failed");
193  std::throw_with_nested(std::runtime_error("Failed to restore from workspace"));
194 }
195 
196 std::string ManagerImpl::MakeDaqId(std::chrono::system_clock::time_point* time) const {
197  for (unsigned jitter = 0;; ++jitter) {
198  auto id_candidate = daq::MakeIdCandidate(m_params.instrument_id.c_str(), jitter, time);
199  if (!HaveDaq(id_candidate, id_candidate)) {
200  return id_candidate;
201  }
202  }
203 }
204 
205 bool ManagerImpl::HaveDaq(std::string_view id, std::string_view file_id) const noexcept {
206  assert(!id.empty());
207  auto it =
208  std::find_if(m_daq_controllers.begin(), m_daq_controllers.end(), [=](auto const& daq) {
209  // Return true if daq is equal
210  // Return true if file_id is non-empty and equal
211  return daq.id == id ||
212  (!file_id.empty() && file_id == daq.controller->GetContext().file_id);
213  });
214  if (it != m_daq_controllers.end()) {
215  LOG4CPLUS_DEBUG(m_logger,
216  "Manager: Found conflicting DAQ: id="
217  << id << ", file_id=" << file_id << " with existing: id=" << it->id
218  << ", file_id=" << it->controller->GetContext().file_id);
219  return true;
220  }
221  return false;
222 }
223 
224 void ManagerImpl::AddInitialKeywords(DaqContext& ctx) {
225  // note: ARCFILE and ORIGFILE is added by daqDpmMerge
227  kws.emplace_back(std::in_place_type<fits::ValueKeyword>, "ORIGIN", m_params.origin);
228  kws.emplace_back(std::in_place_type<fits::ValueKeyword>, "INSTRUME", m_params.instrument_id);
229  // @todo:
230  // - TELESCOP
231  // - DATE?
232 
233  // Update so that OCM keywords are added first (DaqContext may already contain keywords from
234  // request).
236  ctx.keywords.swap(kws);
237 }
238 
239 void ManagerImpl::AddDaq(std::shared_ptr<DaqController> daq, Store store) {
240  assert(daq);
241  LOG4CPLUS_INFO(m_logger, "Manager: AddDaq: Attempting to add DAQ " << daq->GetId());
242  if (daq->GetId().empty()) {
243  throw boost::enable_current_exception(std::invalid_argument("DaqController has empty id!"));
244  }
245  if (daq->GetContext().file_id.empty()) {
246  throw boost::enable_current_exception(
247  std::invalid_argument("DaqController has empty file_id"));
248  }
249  if (HaveDaq(daq->GetId(), daq->GetContext().file_id)) {
250  throw boost::enable_current_exception(
251  std::invalid_argument("DaqController with same id already exists"));
252  }
253 
254  if (store == Store::Yes) {
255  // Requested to store DAQ (i.e. it was not loaded from workspace)
256  m_workspace.StoreContext(daq->GetContext());
257  m_workspace.StoreStatus(*daq->GetStatus());
258  }
259 
260  m_daq_controllers.emplace_back(
261  daq->GetId(),
262  daq,
263  daq->GetStatus()->ConnectObserver([alive = std::weak_ptr<bool>(m_alive_token),
264  prev_state = daq->GetState(),
265  this](ObservableStatus const& status) mutable {
266  if (alive.expired()) {
267  LOG4CPLUS_INFO("daq", "Manager has expired");
268  return;
269  }
270  if (IsFinalState(status.GetState())) {
271  if (!IsFinalState(prev_state)) {
272  // Transition to final state -> Archive DAQ
273  LOG4CPLUS_INFO(
274  m_logger,
275  fmt::format("DAQ transitioned to a final state -> archiving: {} (prev {})",
276  status,
277  prev_state));
278  m_workspace.StoreStatus(status);
279  m_executor.submit([alive = alive, id = status.GetId(), this] {
280  if (alive.expired()) {
281  LOG4CPLUS_INFO("daq", "Manager has expired");
282  return;
283  }
284  ArchiveDaq(id);
285  });
286  }
287  } else {
288  m_workspace.StoreStatus(status);
289 
290  // Handle handover
291  if (prev_state != State::Stopped && status.GetState() == State::Stopped) {
292  // If DAQ is stopped we need to move it to the merging phase
293  // To be safe we defer the execution.
294  // Manager lives as long as executor so we don't have to
295  // check liveness.
296  m_executor.submit([alive = alive, id = status.GetId(), this] {
297  if (alive.expired()) {
298  LOG4CPLUS_INFO("daq", "Manager has expired");
299  return;
300  }
301  MoveToMergePhase(id);
302  });
303  }
304  }
305 
306  // Signal other observers.
307  this->m_status_signal.Signal(status);
308 
309  prev_state = status.GetState();
310  }),
311  daq->ConnectContext(
312  [alive = std::weak_ptr<bool>(m_alive_token), this](DaqContext const& ctx) {
313  if (alive.expired()) {
314  LOG4CPLUS_INFO("daq", "Manager has expired");
315  return;
316  }
317  m_workspace.StoreContext(ctx);
318  }));
319 
320  if (store == Store::Yes) {
321  // Store daq list
322  StoreActiveDaqs();
323  }
324 
325  // Notify observers that DAQ was added.
326  m_status_signal.Signal(*daq->GetStatus());
327 
328  if (daq->GetState() == State::NotScheduled) {
329  ScheduleDaqsAsync();
330  }
331 }
332 
333 void ManagerImpl::RemoveDaq(std::string_view id) {
334  auto it = std::find_if(m_daq_controllers.begin(),
335  m_daq_controllers.end(),
336  [id](auto const& daq) { return daq.id == id; });
337  if (it == m_daq_controllers.end()) {
338  throw boost::enable_current_exception(
339  std::invalid_argument(fmt::format("Remove DAQ failed - no id found: {}", id)));
340  }
341  m_daq_controllers.erase(it);
342 }
343 
344 void ManagerImpl::ArchiveDaq(std::string const& id) {
345  LOG4CPLUS_INFO(m_logger, fmt::format("Moving DAQ to archive: id={}", id));
346  // Archive persistent storage
347  m_workspace.ArchiveDaq(id);
348 
349  StoreActiveDaqs();
350  LOG4CPLUS_INFO(m_logger, fmt::format("Moving DAQ to archive done: id={}", id));
351 }
352 
353 void ManagerImpl::StoreActiveDaqs() const {
354  LOG4CPLUS_INFO(m_logger, "StoreActiveDaqs()");
355  // And remove from
356  std::vector<std::string> daqs;
357  for (Daq const& daq : m_daq_controllers) {
358  assert(daq.controller);
359  if (!IsFinalState(daq.controller->GetState())) {
360  daqs.push_back(daq.id);
361  }
362  }
363  m_workspace.StoreList(daqs);
364 }
365 
366 Status ManagerImpl::GetStatus(std::string_view id) const {
367  auto& daq = FindDaqOrThrow(id);
368  return daq.GetStatus()->GetStatus();
369 }
370 
371 ManagerImpl::Daq::Daq(std::string id_arg,
372  std::shared_ptr<DaqController> controller_arg,
373  boost::signals2::connection conn_status_arg,
374  boost::signals2::connection conn_context_arg) noexcept
375  : id(std::move(id_arg))
376  , controller(std::move(controller_arg))
377  , conn_status(std::move(conn_status_arg))
378  , conn_context(std::move(conn_context_arg)) {
379 }
380 
381 DaqController const* ManagerImpl::FindDaq(std::string_view id) const noexcept {
382  return const_cast<ManagerImpl*>(this)->FindDaq(id);
383 }
384 
385 DaqController* ManagerImpl::FindDaq(std::string_view id) noexcept {
386  auto it = std::find_if(m_daq_controllers.begin(),
387  m_daq_controllers.end(),
388  [id](auto const& daq) { return daq.id == id; });
389  if (it != m_daq_controllers.end()) {
390  return it->controller.get();
391  }
392  return nullptr;
393 }
394 
395 DaqController& ManagerImpl::FindDaqOrThrow(std::string_view id) {
396  auto daq_ptr = FindDaq(id);
397 
398  if (!daq_ptr) {
399  throw boost::enable_current_exception(std::invalid_argument(
400  fmt::format("DaqController with id '{}' does not exist", std::string(id))));
401  }
402  return *daq_ptr;
403 }
404 
405 DaqController const& ManagerImpl::FindDaqOrThrow(std::string_view id) const {
406  return const_cast<ManagerImpl*>(this)->FindDaqOrThrow(id);
407 }
408 
409 void ManagerImpl::MoveToMergePhase(std::string_view id) {
410  auto* daq = FindDaq(id);
411  if (!daq) {
412  LOG4CPLUS_WARN(m_logger, fmt::format("Daq requested to move does not exist: id={}", id));
413  return;
414  }
415  LOG4CPLUS_INFO(m_logger, fmt::format("Moving DAQ to merge-phase: id={}", id));
416  // Copy state we want to keep.
417  auto ctx = daq->GetContext();
418  auto status = daq->GetStatus();
419  auto event_log = daq->GetEventLog();
420  // Delete old DAQ before creating new
421  // note: this invalidates "daq"
422  RemoveDaq(id);
423 
424  // Manually transition to first state in Merging
425  status->SetState(State::NotScheduled);
426 
427  auto new_daq =
428  m_daq_factory.MakeDpmPhase(std::move(ctx), std::move(status), std::move(event_log));
429  AddDaq(new_daq);
430 }
431 
432 boost::future<State> ManagerImpl::StartDaqAsync(DaqContext ctx) {
433  try {
434  AddInitialKeywords(ctx);
435 
436  auto daq = m_daq_factory.MakeOcmPhase(
437  std::move(ctx), std::make_shared<ObservableStatus>(ctx.id, ctx.file_id), m_event_log);
438  assert(daq);
439  AddDaq(daq);
440  return daq->StartAsync()
441  .then(m_executor,
442  [&, daq](boost::future<State> f) -> boost::future<State> {
443  if (f.has_exception()) {
444  // Any error during start may lead to partially started acquisition that
445  // we need to abort
446  return daq->AbortAsync(ErrorPolicy::Tolerant)
447  .then(m_executor,
448  [f = std::move(f)](boost::future<Status>) mutable -> State {
449  // We ignore errors from AbortAsync as we can't do anything
450  // about it Then we return original error
451  f.get(); // throws
452  __builtin_unreachable();
453  });
454  } else {
455  return boost::make_ready_future<State>(f.get());
456  }
457  })
458  .unwrap();
459  } catch (...) {
460  return boost::make_exceptional_future<State>();
461  }
462 }
463 
464 boost::future<Status> ManagerImpl::StopDaqAsync(std::string_view id, ErrorPolicy policy) {
465  try {
466  return FindDaqOrThrow(id).StopAsync(policy);
467  } catch (...) {
468  return boost::make_exceptional_future<Status>();
469  }
470 }
471 
472 boost::future<Status> ManagerImpl::AbortDaqAsync(std::string_view id, ErrorPolicy policy) {
473  try {
474  return FindDaqOrThrow(id).AbortAsync(policy);
475  } catch (...) {
476  return boost::make_exceptional_future<Status>();
477  }
478 }
479 
480 boost::future<Result<Status>> ManagerImpl::AwaitDaqStateAsync(std::string_view id,
481  State state,
482  std::chrono::milliseconds timeout) {
483  try {
484  auto& daq = FindDaqOrThrow(id);
485  auto status = daq.GetStatus();
486  if (!IsSubsequentState(state, daq.GetState())) {
487  LOG4CPLUS_INFO(m_logger,
488  fmt::format("{}: Await condition already fulfilled.", *status));
489  // Condition already fulfilled.
490  return boost::make_ready_future<Result<Status>>({false, *status});
491  } else {
492  auto [fut, abort] = op::InitiateAbortableOperation<op::AwaitStateAsync>(
493  m_executor.get_io_context(), status, state, timeout);
494  // Store abort function so when Manager is deleted it will
495  auto& ref = m_abort_funcs.emplace_back(std::move(abort));
496  LOG4CPLUS_DEBUG("daq.manager",
497  fmt::format("op::AwaitStateAsync initiated. id={}", ref.GetId()));
498  return fut.then(
499  m_executor,
500  [this, id = ref.GetId(), alive = std::weak_ptr<bool>(m_alive_token)](auto res) {
501  LOG4CPLUS_DEBUG("daq.manager",
502  fmt::format("op::AwaitStateAsync completed. id={}", id));
503  // Remove abort function since operation completed, but only if
504  // object is alive.
505  auto is_alive = !alive.expired();
506  if (is_alive) {
507  // Manager is still alive, so we remove abort function
508  RemoveAbortFunc(id);
509  }
510  return res.get();
511  });
512  }
513  } catch (...) {
514  return boost::make_exceptional_future<Result<Status>>();
515  }
516 }
517 
518 void ManagerImpl::UpdateKeywords(std::string_view id, fits::KeywordVector const& keywords) {
519  return FindDaqOrThrow(id).UpdateKeywords(keywords);
520 }
521 
522 StatusSignal& ManagerImpl::GetStatusSignal() {
523  return m_status_signal;
524 }
525 
526 std::vector<std::shared_ptr<DaqController const>> ManagerImpl::GetDaqControllers() {
527  std::vector<std::shared_ptr<DaqController const>> controllers;
528  controllers.reserve(m_daq_controllers.size());
529  std::transform(m_daq_controllers.begin(),
530  m_daq_controllers.end(),
531  std::back_inserter(controllers),
532  [](auto const& daq) { return daq.controller; });
533  return controllers;
534 }
535 
536 void ManagerImpl::RemoveAbortFunc(uint64_t id) noexcept {
537  try {
538  m_abort_funcs.erase(std::remove_if(m_abort_funcs.begin(),
539  m_abort_funcs.end(),
540  [id](auto const& obj) { return id == obj.GetId(); }),
541  m_abort_funcs.end());
542  } catch (...) {
543  }
544 }
545 
546 ManagerImpl::OpAbortFunc::OpAbortFunc(OpAbortFunc::Func&& func)
547  : m_id(NextId()), m_func(std::move(func)) {
548  assert(m_func);
549 }
550 
551 uint64_t ManagerImpl::OpAbortFunc::GetId() const noexcept {
552  return m_id;
553 }
554 
555 bool ManagerImpl::OpAbortFunc::Abort() noexcept {
556  return m_func();
557 }
558 
559 uint64_t ManagerImpl::OpAbortFunc::NextId() {
560  static uint64_t next_id = 0;
561  return next_id++;
562 }
563 
564 void ManagerImpl::ScheduleDaqsAsync() {
565  LOG4CPLUS_TRACE(m_logger, "ScheduleDaqAsync()");
566  // Regardless if caller was invoked from timer or manually we reset the deadline timer
567  // so that timer is restarted when ScheduleMergeAsync completes as necessary.
568  m_schedule_retry.reset();
569 
570  for (auto& daq : m_daq_controllers) {
571  if (daq.controller->GetState() != State::NotScheduled) {
572  continue;
573  }
574  daq.controller->ScheduleMergeAsync().then(
575  m_executor, [id = daq.id, this](boost::future<State> reply) {
576  if (!reply.has_exception()) {
577  LOG4CPLUS_INFO(
578  m_logger,
579  fmt::format("ScheduleDaqAsyn: Successfully scheduled DAQ '{}'", id));
580  // Success
581  return;
582  }
583  LOG4CPLUS_WARN(
584  m_logger,
585  fmt::format("ScheduleDaqAsyn: Failed to schedule DAQ '{}', will retry in 60s",
586  id));
587  // Some kind of error happened, at this point we don't care what it is
588  // but simply schedule a new attempt using deadline timer, unless already scheduled.
589  if (m_schedule_retry) {
590  // Already scheduled..
591  return;
592  }
593  m_schedule_retry.emplace(m_executor.get_io_context(),
594  boost::posix_time::seconds(60));
595  m_schedule_retry->async_wait([this](boost::system::error_code const& error) {
596  if (error) {
597  return;
598  }
599  ScheduleDaqsAsync();
600  });
601  });
602  }
603 }
604 
605 } // namespace daq
rad::IoExecutor::get_io_context
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Definition: ioExecutor.hpp:41
daq::Workspace::StoreStatus
virtual void StoreStatus(Status const &status) const =0
initiate.hpp
Contains declarations for the helper functions to initiate operations.
daq::State
State
Observable states of the data acquisition process.
Definition: state.hpp:39
daq::ManagerImpl::HaveDaq
bool HaveDaq(std::string_view id, std::string_view file_id={}) const noexcept override
Query existing data acquisition by id and optional file_id.
Definition: manager.cpp:205
daq::ManagerParams::instrument_id
std::string instrument_id
Instrument identifier.
Definition: manager.hpp:39
daq::Workspace
Interface to interact with DPM workspace.
Definition: workspace.hpp:31
workspace.hpp
daq::Workspace interface and implementation declaration
awaitState.hpp
Contains declaration for the AwaitStateAsync operation.
rad::IoExecutor
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
manager.hpp
Declaration of daq::Manager
daq::DaqContext::id
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:64
daq::ManagerParams::origin
std::string origin
Definition: manager.hpp:40
report.hpp
conversion.hpp
Contains support functions for daqif.
daq::MakeIdCandidate
std::string MakeIdCandidate(char const *instrument_id, unsigned jitter=0, std::chrono::system_clock::time_point *out=nullptr)
Creates a DAQ id candidate that may or may not be unique.
Definition: manager.cpp:46
daq
Definition: asyncProcess.cpp:15
daq::Workspace::StoreContext
virtual void StoreContext(DaqContext const &context) const =0
Get file name of the data product specification stored in StoreSpecification()
daq::Workspace::ArchiveDaq
virtual void ArchiveDaq(std::string const &id)=0
Archives specified DAQ without deleting any files, typically by moving files it to a specific locatio...
daq::StatusSignal
Observes any status.
Definition: manager.hpp:77
daq::error::NestedExceptionReporter
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
daq::DaqContext
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:48
daq::Workspace::LoadStatus
virtual auto LoadStatus(std::string const &id) const -> Status=0
daq::ManagerImpl::~ManagerImpl
~ManagerImpl() noexcept
Definition: manager.cpp:109
daq::IsSubsequentState
bool IsSubsequentState(State state1, State state2) noexcept
Compares states and returns whether state1 occurs after state2.
Definition: state.cpp:26
daq::ManagerParams::merging_stale_age
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
Definition: manager.hpp:52
daqController.hpp
Contains declaration for for DaqController.
status.hpp
Contains declaration for Status and ObservableStatus.
daq::DaqControllerFactory::MakeOcmPhase
virtual auto MakeOcmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the OCM phase of the DAQ process.
daq::IsFinalState
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
Definition: state.cpp:15
daq::Workspace::LoadContext
virtual auto LoadContext(std::string const &id) const -> DaqContext=0
Get file name of the data product specification stored in StoreSpecification()
daq::DaqContext::keywords
std::vector< daq::fits::KeywordVariant > keywords
Keyword list provided by OCM to Data Product.
Definition: daqContext.hpp:91
daq::ManagerImpl::MakeDaqId
std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const override
Creates a new unique identifier based on the instrument id and current time.
Definition: manager.cpp:196
daq::ManagerParams::acquiring_stale_age
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Definition: manager.hpp:46
daq::fits::UpdateKeywords
void UpdateKeywords(KeywordVector &to, KeywordVector const &from, ConflictPolicy policy=ConflictPolicy::Replace)
Updates to with keywords from from.
Definition: keyword.cpp:553
daq::Workspace::LoadList
virtual auto LoadList() const -> std::vector< std::string >=0
daq::DaqContext::file_id
std::string file_id
Data Product FileId as specified by OLAS ICD.
Definition: daqContext.hpp:69
daq::Status
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:120
daq::ManagerImpl::RestoreFromWorkspace
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
Definition: manager.cpp:124
daq::fits::KeywordVector
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
daq::MakeState
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:63
daq::fits::ConflictPolicy::Replace
@ Replace
Replace keyword that conflicts.
daq::ManagerParams
Configurations parameters directly related to manager.
Definition: manager.hpp:35
daq::ErrorPolicy
ErrorPolicy
Error policy supported by certain operations.
Definition: error.hpp:25
daq::Workspace::StoreList
virtual void StoreList(std::vector< std::string > const &queue) const =0
daq::IsStale
bool IsStale(ManagerParams const &params, State state, std::chrono::system_clock::time_point creation_time)
Definition: manager.cpp:29
daq::UpdateKeywords
void UpdateKeywords(DaqContext &ctx, fits::KeywordVector const &keywords)
Updates (adds or replaces) primary HDU keywords.
Definition: daqContext.cpp:28
error
Definition: main.cpp:23
daq::DaqControllerFactory::MakeDpmPhase
virtual auto MakeDpmPhase(DaqContext daq_ctx, std::shared_ptr< ObservableStatus > status, std::shared_ptr< ObservableEventLog > event_log) -> std::shared_ptr< DaqController >=0
Create instance for the DPM phase of the DAQ process.
daq::ManagerImpl::ManagerImpl
ManagerImpl(rad::IoExecutor &executor, ManagerParams params, Workspace &workspace, std::shared_ptr< ObservableEventLog > event_log, DaqControllerFactory &daq_factory)
Definition: manager.cpp:95
daq::DaqControllerFactory
Abstract factory for DaqControllers.
Definition: daqController.hpp:76