ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
scheduler.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_dpm
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::dpm::Scheduler and related class declarations.
9  */
10 #ifndef DAQ_DPM_SCHEDULER_HPP
11 #define DAQ_DPM_SCHEDULER_HPP
12 #include <daq/config.hpp>
13 
14 #include <functional>
15 #include <vector>
16 
17 #include <boost/signals2.hpp>
18 #include <log4cplus/logger.h>
19 #include <rad/ioExecutor.hpp>
20 
21 #include <daq/dpSpec.hpp>
25 #include <daq/status.hpp>
26 
27 namespace daq::dpm {
28 
29 class Workspace;
30 class DaqWorkspace;
31 struct Resources;
32 
33 /**
34  * Controller for specific DAQ.
35  *
36  * Responsibilities:
37  *
38  * State::Scheduled for each state:
39  * 1. Establish list of missing sources and create and store source mapping.
40  * 2. Transition to Tranferring when source mapping has been stored successfully.
41  *
42  * State::Transferring
43  * 1. Use source mapping from previous state and query workspace if source has been downloaded.
44  * 2. Transfer missing files.
45  * 3. Transition to Merging when all files have been successfully transferred.
46  *
47  * State::Merging
48  * 1. ...
49  * N. Transition to Releasing
50  *
51  * State::Releasing
52  * 1. ...
53  *
54  * State::Complete
55  * - Nothing
56  *
57  * DaqController cannot assume that all activities for each state is complete.
58  */
60 public:
61  virtual ~DaqController() {
62  }
63  /**
64  * @returns data acquisition identifier.
65  */
66  virtual auto GetId() const noexcept -> std::string const& = 0;
67 
68  /**
69  * @return Error flag.
70  */
71  virtual auto GetErrorFlag() const noexcept -> bool = 0;
72 
73  /**
74  * @returns state of data acquisition.
75  */
76  virtual auto GetState() const noexcept -> State = 0;
77 
78  /**
79  * @returns Data product filename.
80  */
81  virtual auto GetResult() const noexcept -> std::filesystem::path const& = 0;
82 
83  /**
84  * @returns status object associated with this daq.
85  */
86  virtual auto GetStatus() noexcept -> ObservableStatus& = 0;
87  virtual auto GetStatus() const noexcept -> ObservableStatus const& = 0;
88 
89  /**
90  * Start/stop operations.
91  *
92  * This assumes the implementation has an internal scheduler that it uses.
93  */
94  virtual void Start() = 0;
95  virtual void Stop() = 0;
96  virtual void Poll() = 0;
97  virtual bool IsStopped() const noexcept = 0;
98 };
99 
100 std::ostream& operator<<(std::ostream& os, DaqController const& daq);
101 
102 /**
103  * Imposes limits on how many concurrent operations are allowed.
104  */
106  /**
107  * Limits how many DAQs overall can be scheduled concurrently.
108  */
109  unsigned short daq = 1;
110 
111  /**
112  * Maximum number of concurrent output transfers.
113  */
114  unsigned short net_send = 0;
115 
116  /**
117  * Maximum number of concurrent input transfers = 0.
118  */
119  unsigned short net_receive = 0;
120 
121  /**
122  * Maximum number of concurrent merge processes.
123  */
124  unsigned short merge = 1;
125 };
126 
127 /**
128  * Options controlling scheduler operations.
129  *
130  * Limits with value `0` is unlimited.
131  */
133  /**
134  * Imposes limits on how many concurrent operations are allowed.
135  */
137  /**
138  * Limits how many DAQs overall can be scheduled concurrently.
139  */
140  unsigned short daq = 1;
141 
142  /**
143  * Maximum number of concurrent output transfers.
144  */
145  unsigned short net_send = 0;
146 
147  /**
148  * Maximum number of concurrent input transfers = 0.
149  */
150  unsigned short net_receive = 0;
151 
152  /**
153  * Maximum number of concurrent transfers
154  */
155  unsigned short merge = 0;
156  } concurrency_limits;
157 };
158 
159 /**
160  * Options for DaqController
161  */
163  std::string merge_bin = "daqDpmMerge";
164  std::string rsync_bin = "rsync";
165 };
166 
167 class Resource;
168 /**
169  * RAII token
170  */
172 public:
173  ResourceToken(Resource*) noexcept;
174  ResourceToken(ResourceToken&&) noexcept = default;
175  ResourceToken& operator=(ResourceToken&&) noexcept = default;
176  ~ResourceToken() noexcept;
177 
178 private:
179  Resource* m_resource;
180 };
181 
182 class Resource {
183 public:
184  /**
185  * Signal that emits on changes to resources.
186  */
187  using Signal = boost::signals2::signal<void()>;
188  Resource() = default;
189  explicit Resource(unsigned limit) noexcept : m_limit(limit) {
190  }
191  Resource& operator=(Resource const&) noexcept = delete;
192 
193  std::optional<ResourceToken> Acquire() noexcept {
194  if (m_limit == 0 || m_used < m_limit) {
195  m_used++;
196  return ResourceToken(this);
197  }
198  return {};
199  }
200 
201  void SetLimit(unsigned new_limit) noexcept {
202  m_limit = new_limit;
203  }
204  unsigned GetLimit() const noexcept {
205  return m_limit;
206  }
207  unsigned GetUsed() const noexcept {
208  return m_used;
209  }
210 
211  /**
212  * Connect to signal that is emitted when a resource become available.
213  */
214  boost::signals2::connection Connect(Signal::slot_type const& slot) {
215  return m_signal.connect(slot);
216  }
217 
218 protected:
219  void Release() noexcept;
220 
221 private:
222  friend class ResourceToken;
223  unsigned m_limit = 0;
224  unsigned m_used = 0;
225  Signal m_signal;
226 };
227 
228 /**
229  * Limited resources
230  */
231 struct Resources {
236 };
237 
238 /**
239  * Schedules asynchronous activities that results in merged Data Product and delivery.
240  *
241  * Internally it maintains a prioritized queue of DAQs that should be merged.
242  *
243  * Main responsibilities:
244  *
245  * - Maintain DAQ queue in priority order.
246  * - Process each queued DAQ in order (possibly with configurable parallelization):
247  * - Ensure* input source files are transferred.
248  * - Ensure* DAQ is eventually merged to create DP.
249  * - Update DAQ state to reflect the state it is in.
250  * - State persistence
251  * - Store state to workspace and
252  * - Load state to recover at startup.
253  *
254  * Outside the scope of Scheduler:
255  *
256  * - Prerequisite for DAQ is that the Data Product Specification is final and can be considered
257  * "read-only" within the Scheduler. This means that e.g. the input file names have been
258  * determined.
259  *
260  * TBD/notes:
261  * - Workspace management (should be same component responsible for):
262  * - Who finalizes DP Specification (local filenames)?
263  * Local file names are missing and needs to be determined (ensuring that files do not collide).
264  * - Who initializes workspace with new DAQs? It should be the entity that decides about file
265  * names (how about daq::dpm::Workspace?).
266  * - Who archives DAQs?
267  * - Only DAQs that are actively being worked on needs to be represented in Scheduler. The remaining
268  * backlog could just be entries in a file `<workspace>/queue.json`.
269  * - This would imply that Scheduler is at least responsible for:
270  * - Maintaining `queue.json`
271  * - Interface use strings rather than fat objects.
272  * - It creates fat objects
273  */
274 class Scheduler {
275 public:
276  virtual ~Scheduler() {
277  }
278 
279  /**
280  * Start/stop operations.
281  *
282  * This assumes the implementation has an internal scheduler that it uses.
283  */
284  virtual void Start() = 0;
285  virtual void Stop() = 0;
286 
287  /**
288  * Queues DAQ for processing.
289  *
290  * @note Scheduler is responsible for creating unique local names for input files.
291  *
292  * @param dp_spec JSON encoded Data Product Specification. If parsing fails a
293  * std::invalid_argument will be thrown.
294  *
295  * @returns DAQ id on success.
296  * @throw std::invalid_argument if dp_spec is invalid or is already queued.
297  */
298  virtual std::string QueueDaq(std::string const& dp_spec) = 0;
299 
300  /**
301  * Abort merging DAQ identified by @c id.
302  *
303  * @note Workspace related to this will be purged.
304  *
305  * @param id DAQ id.
306  *
307  * @throw std::invalid_argument if DAQ is unknown.
308  */
309  virtual void AbortDaq(std::string const& id) = 0;
310 
311  /**
312  * Queries if DAQ with ID has been queued before in the current workspace.
313  *
314  * @param id DAQ id.
315  * @return true if DAQ is in the merge queue, false otherwise.
316  * @throw std::invalid_argument if DAQ is not known.
317  */
318  virtual bool IsQueued(std::string const& id) const noexcept = 0;
319 
320  /**
321  * Queries current DAQ status, possibly from last recorded status in workspace.
322  *
323  * @param id DAQ id.
324  * @return Merge status of @a id.
325  * @throw std::invalid_argument if DAQ is not known.
326  */
327  virtual Status GetDaqStatus(std::string const& id) const = 0;
328 
329  /**
330  * Queries current DAQ queue.
331  *
332  * @return list of DAQs pending or already started to be merged.
333  */
334  virtual std::vector<std::string> GetQueue() const noexcept = 0;
335 
336  /**
337  * Signals
338  */
339  /// @{
340  using StatusSignal = boost::signals2::signal<void(Status const&)>;
341  virtual boost::signals2::connection ConnectStatus(StatusSignal::slot_type const& slot) = 0;
342 
343  /// @}
344 };
345 
346 class SchedulerImpl : public Scheduler {
347 public:
349  std::function<std::unique_ptr<DaqController>(std::unique_ptr<DaqWorkspace>, Resources&)>;
350 
351  /**
352  * Constructs a scheduler loading information from workspace @a ws.
353  *
354  * The scheduler will load the stored list of queued DAQs and schedule operations to be
355  * performed in priority order.
356  */
357  SchedulerImpl(rad::IoExecutor& executor,
358  Workspace& workspace,
359  DaqControllerFactory daq_controller_factory,
360  SchedulerOptions const& options);
361 
362  std::string QueueDaq(std::string const& dp_spec) override;
363  void AbortDaq(std::string const&) override;
364  bool IsQueued(std::string const& id) const noexcept override;
365  Status GetDaqStatus(std::string const& id) const override;
366  std::vector<std::string> GetQueue() const noexcept override;
367 
368  /**
369  * @name Signals.
370  *
371  * - DAQ Status
372  */
373  /// @{
374  boost::signals2::connection ConnectStatus(StatusSignal::slot_type const& slot) override;
375  /// @}
376  void Start() override;
377  void Stop() override;
378 
379 private:
380  /**
381  * Polls for possible activitites to start.
382  *
383  * This is the core of the scheduler which will initiate asynchronus activities.
384  *
385  * This should be invoked as external events happen:
386  * - Previously started activities complete (which may allow new activities to start).
387  * - Modifiers invoked (QueueDaq/AbortDaq)
388  * - Limits changed.
389  *
390  * Activities are only started if limits allow them, in DAQ priority order.
391  *
392  * List of activities:
393  *
394  * - Initiate active DAQ.
395  * - State::Transferring
396  * - Transfer all files (identify missing sources and start transfer).
397  * - State::Merge:
398  * - Merge to create Data Product
399  * - State::Releasing:
400  * - Release finished DP to receivers.
401  *
402  * Updates m_active with new DAQs if limits and queue allows.
403  * If active daq is Scheduled it will first be transitioned to Tranferring.
404  */
405  void Poll();
406 
407  /**
408  * Checks queue for DAQs to start merging, limited on available resources.
409  */
410  void ActivateFromQueue();
411 
412  /**
413  * Checks m_active for completed daqs and if completed:
414  * - Erase from m_active
415  * - Erase from queue
416  * - Archive workspace
417  */
418  void ArchiveCompleted();
419 
420  /**
421  * Schedules Poll() to be executed at a later time.
422  *
423  * @thread_safe
424  */
425  void DeferredPoll();
426 
427  rad::IoExecutor& m_executor;
428  Workspace& m_workspace;
429  DaqControllerFactory m_daq_controller_factory;
430  SchedulerOptions m_options;
431 
432  /**
433  * Get candidates for merging (set of DAQs from backlog/queue not in m_active.
434  */
435  std::vector<std::string> GetCandidates() const;
436 
437  struct Active {
438  Active(std::unique_ptr<DaqController> daq_arg, ResourceToken token_arg)
439  : daq(std::move(daq_arg)), token(std::move(token_arg)) {
440  assert(daq);
441  }
442  std::unique_ptr<DaqController> daq;
443  ResourceToken token;
444  };
445  /**
446  * Active (transferring files, merging or delivering result) DAQs in priority order.
447  *
448  * Operations for active DAQs are concurrent. There's no synchronization between DAQs.
449  *
450  * Number of active DAQs is limited by m_resources.daq.
451  */
452  std::vector<Active> m_active;
453 
454  /**
455  * Container of all DAQs that are currently being processed or is simply in the backlog/queue.
456  * This must be kept synchronized with file system so that it can be relied on for answering
457  * questions like "have this DAQ been queued before".
458  */
459  std::vector<std::string> m_queue;
460 
461  Resources m_resources;
462 
463  StatusSignal m_status_signal;
464  log4cplus::Logger m_logger;
465  /**
466  * A shared pointer value which if valid indicates that Scheduler is alive.
467  * The value itself indicates whether a deferred Poll() has been scheduled but not yet executed.
468  */
469  std::shared_ptr<bool> m_liveness;
470 
471  /**
472  * Indicates if Scheduler is stopped -> shouldn't poll.
473  */
474  bool m_stopped = true;
475 };
476 
477 /**
478  * Internal data structure to SchedulerImpl
479  */
481 public:
482  using RsyncFactory =
483  std::function<std::unique_ptr<RsyncAsyncProcessIf>(boost::asio::io_context&,
484  std::string, // source
485  std::string, // dest
486  RsyncOptions const&,
488 
489  using ProcFactory = std::function<std::unique_ptr<AsyncProcessIf>(boost::asio::io_context&,
490  std::vector<std::string>)>;
491  /**
492  * Construct controller for existing workspace.
493  * @post State is loaded from @a workspace.
494  *
495  * @throws std-exception (possibly nested) containing error.
496  */
498  std::unique_ptr<DaqWorkspace> workspace,
499  Resources& resources,
500  RsyncFactory rsync_factory,
501  ProcFactory proc_factory,
502  DaqControllerOptions opts);
503 
504  void Start() override;
505  void Stop() override;
506  auto IsStopped() const noexcept -> bool override;
507 
508  /**
509  * @returns data acquisition identifier.
510  */
511  auto GetId() const noexcept -> std::string const& override;
512 
513  /**
514  * @return Error flag.
515  */
516  auto GetErrorFlag() const noexcept -> bool override;
517 
518  /**
519  * @returns state of data acquisition.
520  */
521  auto GetState() const noexcept -> State override;
522 
523  auto GetResult() const noexcept -> std::filesystem::path const& override {
524  return m_result;
525  }
526 
527  /**
528  * @returns status object associated with this daq.
529  */
530  auto GetStatus() noexcept -> ObservableStatus& override;
531  auto GetStatus() const noexcept -> ObservableStatus const& override;
532  void Poll() override;
533 
534 private:
535  using SourceFile = SourceResolver::SourceFile;
536  void DeferredPoll();
537  struct Scheduled {};
538  struct Transferring {
539  struct Transfer {
541  std::filesystem::path local_path;
542  std::shared_ptr<RsyncAsyncProcessIf> proc;
544  };
545  Transferring(SourceResolver resolver);
546  Transferring(Transferring&&) = default;
547  Transferring& operator=(Transferring&&) = default;
548  ~Transferring() noexcept;
549  bool HasTransfer(SourceFile const&) const noexcept;
550  Transfer* GetTransfer(SourceFile const&) noexcept;
551  void EraseTransfer(SourceFile const&);
552 
553  SourceResolver resolver;
554  std::vector<Transfer> transfers;
555  };
556  struct Merging {
557  Merging() = default;
558  Merging(Merging&&) = default;
559  Merging& operator=(Merging&&) = default;
560  ~Merging() noexcept;
561  void Reset();
562  std::shared_ptr<AsyncProcessIf> merger = {};
563  std::optional<ResourceToken> token = std::nullopt;
564  };
565  struct Completed {};
566 
567  // Poll methods are allowed to perform state transitions.
568  void Poll(Scheduled&);
569 
570  void Poll(Transferring&);
571  void TransferComplete(SourceFile const& source,
572  std::filesystem::path const& local_path,
573  boost::future<int> result) noexcept;
574 
575  void Poll(Merging&);
576  void MergeComplete(boost::future<int> result) noexcept;
577 
578  void Poll(Completed&);
579 
580  /**
581  * Handle JSON message from merger
582  */
583  void HandleMergeMessage(std::string const& line) noexcept;
584 
585  using StateVariant = std::variant<Scheduled, Transferring, Merging, /*Releasing,*/ Completed>;
586  static State MakeState(StateVariant const&);
587  /**
588  * Updates m_state and m_status and is the normal way m_state/m_status is updated (except during
589  * construction).
590  */
591  void SetState(StateVariant s, bool error = false);
592  void SetError(bool error);
593 
594  /**
595  * State persistence
596  */
597  rad::IoExecutor& m_executor;
598  std::unique_ptr<DaqWorkspace> m_workspace;
599  Resources& m_resources;
600  RsyncFactory m_rsync_factory;
601  ProcFactory m_proc_factory;
602  DaqControllerOptions m_options;
603 
604  DpSpec const m_dpspec;
605  /**
606  * Path to resulting data product (originates from m_dpspec.target.file_id)
607  */
608  std::filesystem::path m_result;
609 
610  StateVariant m_state_ctx;
611 
612  /**
613  * Current in-memory status.
614  */
615  ObservableStatus m_status;
616  boost::signals2::scoped_connection m_status_connection;
617 
618  /**
619  * A shared pointer value which if valid indicates that Scheduler is alive.
620  * The value itself indicates whether a deferred Poll() has been scheduled but not yet executed.
621  */
622  std::shared_ptr<bool> m_liveness;
623  bool m_stopped;
624  /**
625  * If true no new async activities should be started. This is mainly used to avoid stopping
626  * abruptly when an error occurs and let already started activities complete.
627  */
628  bool m_soft_stop = false;
629  log4cplus::Logger m_logger;
630 
631  /**
632  */
633  std::vector<std::unique_ptr<RsyncAsyncProcessIf>> m_transfers;
634 };
635 
636 } // namespace daq::dpm
637 
638 #endif // #ifndef DAQ_DPM_SCHEDULER_HPP
daq::dpm::Resource::Acquire
std::optional< ResourceToken > Acquire() noexcept
Definition: scheduler.hpp:193
daq::dpm::Resource::GetLimit
unsigned GetLimit() const noexcept
Definition: scheduler.hpp:204
daq::dpm::DaqController::IsStopped
virtual bool IsStopped() const noexcept=0
daq::dpm::ConcurrencyLimits
Imposes limits on how many concurrent operations are allowed.
Definition: scheduler.hpp:105
daq::State
State
Observable states of the data acquisition process.
Definition: state.hpp:39
ioExecutor.hpp
daq::dpm::DaqController::Poll
virtual void Poll()=0
daq::dpm::DaqController::GetResult
virtual auto GetResult() const noexcept -> std::filesystem::path const &=0
daq::dpm::Scheduler::GetQueue
virtual std::vector< std::string > GetQueue() const noexcept=0
Queries current DAQ queue.
daq::dpm::Workspace
Interface to interact with DPM workspace.
Definition: workspace.hpp:98
daq::DpSpec
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:28
sourceResolver.hpp
Declares daq::dpm::SourceResolver.
daq::RsyncAsyncProcessIf::DryRun
DryRun
Definition: rsyncAsyncProcess.hpp:79
daq::dpm::DaqController::Start
virtual void Start()=0
Start/stop operations.
daq::RsyncOptions
Options controlling rsync invocation.
Definition: rsyncAsyncProcess.hpp:28
daq::dpm::Resource::GetUsed
unsigned GetUsed() const noexcept
Definition: scheduler.hpp:207
dpSpec.hpp
rad::IoExecutor
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
daq::dpm::Scheduler::AbortDaq
virtual void AbortDaq(std::string const &id)=0
Abort merging DAQ identified by id.
daq::dpm::SchedulerImpl
Definition: scheduler.hpp:346
daq::dpm::Resources::daqs
Resource daqs
Definition: scheduler.hpp:232
daq::dpm::Resources::net_receive
Resource net_receive
Definition: scheduler.hpp:234
daq::dpm::DaqController
Controller for specific DAQ.
Definition: scheduler.hpp:59
daq
Definition: asyncProcess.cpp:15
config.hpp
daq::dpm::DaqControllerImpl::Transferring::Transfer::proc
std::shared_ptr< RsyncAsyncProcessIf > proc
Definition: scheduler.hpp:542
daq::dpm::Resources::net_send
Resource net_send
Definition: scheduler.hpp:233
daq::dpm::ResourceToken
RAII token.
Definition: scheduler.hpp:171
daq::ObservableStatus
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:161
daq::StatusSignal
Observes any status.
Definition: manager.hpp:77
daq::dpm::Scheduler::StatusSignal
boost::signals2::signal< void(Status const &)> StatusSignal
Signals.
Definition: scheduler.hpp:340
daq::dpm::Resource::SetLimit
void SetLimit(unsigned new_limit) noexcept
Definition: scheduler.hpp:201
daq::dpm::Scheduler::~Scheduler
virtual ~Scheduler()
Definition: scheduler.hpp:276
daq::dpm::Resource::Connect
boost::signals2::connection Connect(Signal::slot_type const &slot)
Connect to signal that is emitted when a resource become available.
Definition: scheduler.hpp:214
daq::dpm::Scheduler::QueueDaq
virtual std::string QueueDaq(std::string const &dp_spec)=0
Queues DAQ for processing.
daq::dpm::SchedulerOptions
Options controlling scheduler operations.
Definition: scheduler.hpp:132
rad
Definition: ioExecutor.hpp:6
daq::dpm::Resource::Resource
Resource(unsigned limit) noexcept
Definition: scheduler.hpp:189
daq::dpm::Resource::Resource
Resource()=default
daq::dpm::DaqControllerImpl
Internal data structure to SchedulerImpl.
Definition: scheduler.hpp:480
daq::dpm::Scheduler
Schedules asynchronous activities that results in merged Data Product and delivery.
Definition: scheduler.hpp:274
daq::dpm::Resources::merge
Resource merge
Definition: scheduler.hpp:235
daq::dpm::Resource::operator=
Resource & operator=(Resource const &) noexcept=delete
daq::dpm::Scheduler::Stop
virtual void Stop()=0
daq::dpm
Definition: testDpSpec.cpp:16
daq::dpm::DaqControllerImpl::RsyncFactory
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string, std::string, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Definition: scheduler.hpp:487
rsyncAsyncProcess.hpp
daq::RsyncAsyncProcess and related class declarations.
status.hpp
Contains declaration for Status and ObservableStatus.
asyncProcess.hpp
daq::AsyncProcess class definition
daq::dpm::DaqController::~DaqController
virtual ~DaqController()
Definition: scheduler.hpp:61
daq::dpm::DaqController::GetStatus
virtual auto GetStatus() noexcept -> ObservableStatus &=0
daq::dpm::SourceResolver::SourceFile
Definition: sourceResolver.hpp:29
daq::dpm::DaqControllerImpl::ProcFactory
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string >)> ProcFactory
Definition: scheduler.hpp:490
daq::dpm::SourceResolver
Provides location of fits source file.
Definition: sourceResolver.hpp:27
daq::dpm::Resource::Signal
boost::signals2::signal< void()> Signal
Signal that emits on changes to resources.
Definition: scheduler.hpp:187
daq::dpm::SchedulerOptions::ConcurrencyLimits
Imposes limits on how many concurrent operations are allowed.
Definition: scheduler.hpp:136
daq::Status
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:120
daq::dpm::DaqControllerOptions
Options for DaqController.
Definition: scheduler.hpp:162
daq::dpm::DaqController::GetErrorFlag
virtual auto GetErrorFlag() const noexcept -> bool=0
daq::MakeState
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:63
daq::dpm::DaqControllerImpl::Transferring::Transfer::token
ResourceToken token
Definition: scheduler.hpp:543
daq::dpm::Resource
Definition: scheduler.hpp:182
daq::dpm::ResourceToken::ResourceToken
ResourceToken(ResourceToken &&) noexcept=default
daq::dpm::Resources
Limited resources.
Definition: scheduler.hpp:231
daq::dpm::DaqController::Stop
virtual void Stop()=0
daq::dpm::Scheduler::IsQueued
virtual bool IsQueued(std::string const &id) const noexcept=0
Queries if DAQ with ID has been queued before in the current workspace.
daq::dpm::SchedulerImpl::DaqControllerFactory
std::function< std::unique_ptr< DaqController >(std::unique_ptr< DaqWorkspace >, Resources &)> DaqControllerFactory
Definition: scheduler.hpp:349
daq::AsyncProcessIf
Interface to asynchronous process.
Definition: asyncProcess.hpp:27
daq::dpm::DaqController::GetId
virtual auto GetId() const noexcept -> std::string const &=0
error
Definition: main.cpp:23
daq::dpm::DaqController::GetState
virtual auto GetState() const noexcept -> State=0
daq::dpm::DaqControllerImpl::Transferring::Transfer::local_path
std::filesystem::path local_path
Definition: scheduler.hpp:541
daq::State::DAQ states handled by OCM.
DAQ states handled by OCM.
Initial state of data acquisition.
daq::dpm::Scheduler::GetDaqStatus
virtual Status GetDaqStatus(std::string const &id) const =0
Queries current DAQ status, possibly from last recorded status in workspace.
daq::dpm::Scheduler::Start
virtual void Start()=0
Start/stop operations.
daq::dpm::DaqControllerImpl::Transferring::Transfer::source
SourceFile source
Definition: scheduler.hpp:540
daq::dpm::DaqControllerImpl::Transferring::Transfer
Definition: scheduler.hpp:539