ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
scheduler.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_dpm
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::dpm::Scheduler and related class declarations.
9  */
10 #ifndef DAQ_DPM_SCHEDULER_HPP
11 #define DAQ_DPM_SCHEDULER_HPP
12 #include <daq/config.hpp>
13 
14 #include <functional>
15 #include <vector>
16 
17 #include <boost/signals2.hpp>
18 #include <log4cplus/logger.h>
19 #include <rad/ioExecutor.hpp>
20 
22 #include <daq/json/dpSpec.hpp>
25 #include <daq/status.hpp>
26 
27 namespace daq::dpm {
28 
29 class Workspace;
30 class DaqWorkspace;
31 struct Resources;
32 
33 /**
34  * Controller for specific DAQ.
35  *
36  * Responsibilities:
37  *
38  * State::Scheduled for each state:
39  * 1. Establish list of missing sources and create and store source mapping.
40  * 2. Transition to Tranferring when source mapping has been stored successfully.
41  *
42  * State::Transferring
43  * 1. Use source mapping from previous state and query workspace if source has been downloaded.
44  * 2. Transfer missing files.
45  * 3. Transition to Merging when all files have been successfully transferred.
46  *
47  * State::Merging
48  * 1. ...
49  * N. Transition to Releasing
50  *
51  * State::Releasing
52  * 1. ...
53  *
54  * State::Complete
55  * - Nothing
56  *
57  * DaqController cannot assume that all activities for each state is complete.
58  */
60 public:
61  virtual ~DaqController() {
62  }
63  /**
64  * @returns data acquisition identifier.
65  */
66  virtual auto GetId() const noexcept -> std::string const& = 0;
67 
68  /**
69  * @return Error flag.
70  */
71  virtual auto GetErrorFlag() const noexcept -> bool = 0;
72 
73  /**
74  * @returns state of data acquisition.
75  */
76  virtual auto GetState() const noexcept -> State = 0;
77 
78  /**
79  * @returns Data product filename.
80  */
81  virtual auto GetResult() const noexcept -> std::filesystem::path const& = 0;
82 
83  /**
84  * @returns status object associated with this daq.
85  */
86  virtual auto GetStatus() noexcept -> ObservableStatus& = 0;
87  virtual auto GetStatus() const noexcept -> ObservableStatus const& = 0;
88 
89  /**
90  * Start/stop operations.
91  *
92  * This assumes the implementation has an internal scheduler that it uses.
93  */
94  virtual void Start() = 0;
95  virtual void Stop() = 0;
96  virtual void Poll() = 0;
97  virtual bool IsStopped() const noexcept = 0;
98 };
99 
100 std::ostream& operator<<(std::ostream& os, DaqController const& daq);
101 
102 /**
103  * Imposes limits on how many concurrent operations are allowed.
104  */
106  /**
107  * Limits how many DAQs overall can be scheduled concurrently.
108  */
109  unsigned short daq = 1;
110 
111  /**
112  * Maximum number of concurrent output transfers.
113  */
114  unsigned short net_send = 0;
115 
116  /**
117  * Maximum number of concurrent input transfers = 0.
118  */
119  unsigned short net_receive = 0;
120 
121  /**
122  * Maximum number of concurrent merge processes.
123  */
124  unsigned short merge = 1;
125 };
126 
127 /**
128  * Options controlling scheduler operations.
129  *
130  * Limits with value `0` is unlimited.
131  */
133  /**
134  * Imposes limits on how many concurrent operations are allowed.
135  */
137  /**
138  * Limits how many DAQs overall can be scheduled concurrently.
139  */
140  unsigned short daq = 1;
141 
142  /**
143  * Maximum number of concurrent output transfers.
144  */
145  unsigned short net_send = 0;
146 
147  /**
148  * Maximum number of concurrent input transfers = 0.
149  */
150  unsigned short net_receive = 0;
151 
152  /**
153  * Maximum number of concurrent transfers
154  */
155  unsigned short merge = 0;
156  } concurrency_limits;
157 };
158 
159 /**
160  * Options for DaqController
161  */
163  std::string merge_bin = "daqDpmMerge";
164  std::string rsync_bin = "rsync";
165 };
166 
167 class Resource;
168 /**
169  * RAII token
170  */
172 public:
173  ResourceToken(Resource*) noexcept;
174  ResourceToken(ResourceToken&&) noexcept = default;
175  ResourceToken& operator=(ResourceToken&&) noexcept = default;
176  ~ResourceToken() noexcept;
177 
178 private:
179  Resource* m_resource;
180 };
181 
182 class Resource {
183 public:
184  /**
185  * Signal that emits on changes to resources.
186  */
187  using Signal = boost::signals2::signal<void()>;
188  Resource() = default;
189  explicit Resource(unsigned limit) noexcept : m_limit(limit) {
190  }
191  Resource& operator=(Resource const&) noexcept = delete;
192 
193  std::optional<ResourceToken> Acquire() noexcept {
194  if (m_limit == 0 || m_used < m_limit) {
195  m_used++;
196  return ResourceToken(this);
197  }
198  return {};
199  }
200 
201  void SetLimit(unsigned new_limit) noexcept {
202  m_limit = new_limit;
203  }
204  unsigned GetLimit() const noexcept {
205  return m_limit;
206  }
207  unsigned GetUsed() const noexcept {
208  return m_used;
209  }
210 
211  /**
212  * Connect to signal that is emitted when a resource become available.
213  */
214  boost::signals2::connection Connect(Signal::slot_type const& slot) {
215  return m_signal.connect(slot);
216  }
217 
218 protected:
219  void Release() noexcept;
220 
221 private:
222  friend class ResourceToken;
223  unsigned m_limit = 0;
224  unsigned m_used = 0;
225  Signal m_signal;
226 };
227 
228 /**
229  * Limited resources
230  */
231 struct Resources {
236 };
237 
238 /**
239  * Schedules asynchronous activities that results in merged Data Product and delivery.
240  *
241  * Internally it maintains a prioritized queue of DAQs that should be merged.
242  *
243  * Main responsibilities:
244  *
245  * - Maintain DAQ queue in priority order.
246  * - Process each queued DAQ in order (possibly with configurable parallelization):
247  * - Ensure* input source files are transferred.
248  * - Ensure* DAQ is eventually merged to create DP.
249  * - Update DAQ state to reflect the state it is in.
250  * - State persistence
251  * - Store state to workspace and
252  * - Load state to recover at startup.
253  *
254  * Outside the scope of Scheduler:
255  *
256  * - Prerequisite for DAQ is that the Data Product Specification is final and can be considered
257  * "read-only" within the Scheduler. This means that e.g. the input file names have been
258  * determined.
259  *
260  * TBD/notes:
261  * - Workspace management (should be same component responsible for):
262  * - Who finalizes DP Specification (local filenames)?
263  * Local file names are missing and needs to be determined (ensuring that files do not collide).
264  * - Who initializes workspace with new DAQs? It should be the entity that decides about file
265  * names (how about daq::dpm::Workspace?).
266  * - Who archives DAQs?
267  * - Only DAQs that are actively being worked on needs to be represented in Scheduler. The remaining
268  * backlog could just be entries in a file `<workspace>/queue.json`.
269  * - This would imply that Scheduler is at least responsible for:
270  * - Maintaining `queue.json`
271  * - Interface use strings rather than fat objects.
272  * - It creates fat objects
273  */
274 class Scheduler {
275 public:
276  virtual ~Scheduler() {
277  }
278 
279  /**
280  * Start/stop operations.
281  *
282  * This assumes the implementation has an internal scheduler that it uses.
283  */
284  virtual void Start() = 0;
285  virtual void Stop() = 0;
286 
287  /**
288  * Queues DAQ for processing.
289  *
290  * @note Scheduler is responsible for creating unique local names for input files.
291  *
292  * @param dp_spec JSON encoded Data Product Specification. If parsing fails a
293  * std::invalid_argument will be thrown.
294  *
295  * @returns DAQ id on success.
296  * @throw std::invalid_argument if dp_spec is invalid or is already queued.
297  */
298  virtual std::string QueueDaq(std::string const& dp_spec) = 0;
299 
300  /**
301  * Abort merging DAQ identified by @c id.
302  *
303  * @note Workspace related to this will be purged.
304  *
305  * @param id DAQ id.
306  *
307  * @throw std::invalid_argument if DAQ is unknown.
308  */
309  virtual void AbortDaq(std::string const& id) = 0;
310 
311  /**
312  * Queries if DAQ with ID has been queued before in the current workspace.
313  *
314  * @param id DAQ id.
315  * @return true if DAQ is in the merge queue, false otherwise.
316  * @throw std::invalid_argument if DAQ is not known.
317  */
318  virtual bool IsQueued(std::string const& id) const noexcept = 0;
319 
320  /**
321  * Queries current DAQ status, possibly from last recorded status in workspace.
322  *
323  * @param id DAQ id.
324  * @return Merge status of @a id.
325  * @throw std::invalid_argument if DAQ is not known.
326  */
327  virtual Status GetDaqStatus(std::string const& id) const = 0;
328 
329  /**
330  * Queries current DAQ queue.
331  *
332  * @return list of DAQs pending or already started to be merged.
333  */
334  virtual std::vector<std::string> GetQueue() const noexcept = 0;
335 
336  /**
337  * Signals
338  */
339  /// @{
340  using StatusSignal = boost::signals2::signal<void(Status const&)>;
341  virtual boost::signals2::connection ConnectStatus(StatusSignal::slot_type const& slot) = 0;
342 
343  /// @}
344 };
345 
346 class SchedulerImpl : public Scheduler {
347 public:
349  std::function<std::unique_ptr<DaqController>(std::unique_ptr<DaqWorkspace>, Resources&)>;
350 
351  /**
352  * Constructs a scheduler loading information from workspace @a ws.
353  *
354  * The scheduler will load the stored list of queued DAQs and schedule operations to be
355  * performed in priority order.
356  */
357  SchedulerImpl(rad::IoExecutor& executor,
358  Workspace& workspace,
359  DaqControllerFactory daq_controller_factory,
360  SchedulerOptions const& options);
361 
362  std::string QueueDaq(std::string const& dp_spec) override;
363  void AbortDaq(std::string const&) override;
364  bool IsQueued(std::string const& id) const noexcept override;
365  Status GetDaqStatus(std::string const& id) const override;
366  std::vector<std::string> GetQueue() const noexcept override;
367 
368  /**
369  * @name Signals.
370  *
371  * - DAQ Status
372  */
373  /// @{
374  boost::signals2::connection ConnectStatus(StatusSignal::slot_type const& slot) override;
375  /// @}
376  void Start() override;
377  void Stop() override;
378 
379 private:
380  /**
381  * Polls for possible activitites to start.
382  *
383  * This is the core of the scheduler which will initiate asynchronus activities.
384  *
385  * This should be invoked as external events happen:
386  * - Previously started activities complete (which may allow new activities to start).
387  * - Modifiers invoked (QueueDaq/AbortDaq)
388  * - Limits changed.
389  *
390  * Activities are only started if limits allow them, in DAQ priority order.
391  *
392  * List of activities:
393  *
394  * - Initiate active DAQ.
395  * - State::Transferring
396  * - Transfer all files (identify missing sources and start transfer).
397  * - State::Merge:
398  * - Merge to create Data Product
399  * - State::Releasing:
400  * - Release finished DP to receivers.
401  *
402  * Updates m_active with new DAQs if limits and queue allows.
403  * If active daq is Scheduled it will first be transitioned to Tranferring.
404  */
405  void Poll();
406 
407  /**
408  * Checks queue for DAQs to start merging, limited on available resources.
409  */
410  void ActivateFromQueue();
411 
412  /**
413  * Checks m_active for completed daqs and if completed:
414  * - Erase from m_active
415  * - Erase from queue
416  * - Archive workspace
417  */
418  void ArchiveCompleted();
419 
420  /**
421  * Schedules Poll() to be executed at a later time.
422  *
423  * @thread_safe
424  */
425  void DeferredPoll();
426 
427  rad::IoExecutor& m_executor;
428  Workspace& m_workspace;
429  DaqControllerFactory m_daq_controller_factory;
430  SchedulerOptions m_options;
431 
432  /**
433  * Get candidates for merging (set of DAQs from backlog/queue not in m_active.
434  */
435  std::vector<std::string> GetCandidates() const;
436 
437  struct Active {
438  Active(std::unique_ptr<DaqController> daq_arg, ResourceToken token_arg)
439  : daq(std::move(daq_arg)), token(std::move(token_arg)) {
440  assert(daq);
441  }
442  std::unique_ptr<DaqController> daq;
443  ResourceToken token;
444  };
445  /**
446  * Active (transferring files, merging or delivering result) DAQs in priority order.
447  *
448  * Operations for active DAQs are concurrent. There's no synchronization between DAQs.
449  *
450  * Number of active DAQs is limited by m_resources.daq.
451  */
452  std::vector<Active> m_active;
453 
454  /**
455  * Container of all DAQs that are currently being processed or is simply in the backlog/queue.
456  * This must be kept synchronized with file system so that it can be relied on for answering
457  * questions like "have this DAQ been queued before".
458  */
459  std::vector<std::string> m_queue;
460 
461  Resources m_resources;
462 
463  StatusSignal m_status_signal;
464  log4cplus::Logger m_logger;
465  /**
466  * A shared pointer value which if valid indicates that Scheduler is alive.
467  * The value itself indicates whether a deferred Poll() has been scheduled but not yet executed.
468  */
469  std::shared_ptr<bool> m_liveness;
470 
471  /**
472  * Indicates if Scheduler is stopped -> shouldn't poll.
473  */
474  bool m_stopped = true;
475 };
476 
477 /**
478  * Internal data structure to SchedulerImpl
479  */
481 public:
482  using RsyncFactory =
483  std::function<std::unique_ptr<RsyncAsyncProcessIf>(boost::asio::io_context&,
484  std::string const&, // source
485  std::string const&, // dest
486  RsyncOptions const&,
488 
489  using ProcFactory = std::function<std::unique_ptr<AsyncProcessIf>(
490  boost::asio::io_context&, std::vector<std::string> const&)>;
491  /**
492  * Construct controller for existing workspace.
493  * @post State is loaded from @a workspace.
494  *
495  * @throws std-exception (possibly nested) containing error.
496  */
498  std::unique_ptr<DaqWorkspace> workspace,
499  Resources& resources,
500  RsyncFactory rsync_factory,
501  ProcFactory proc_factory,
502  DaqControllerOptions opts);
503 
504  void Start() override;
505  void Stop() override;
506  auto IsStopped() const noexcept -> bool override;
507 
508  /**
509  * @returns data acquisition identifier.
510  */
511  auto GetId() const noexcept -> std::string const& override;
512 
513  /**
514  * @return Error flag.
515  */
516  auto GetErrorFlag() const noexcept -> bool override;
517 
518  /**
519  * @returns state of data acquisition.
520  */
521  auto GetState() const noexcept -> State override;
522 
523  auto GetResult() const noexcept -> std::filesystem::path const& override {
524  return m_result;
525  }
526 
527  /**
528  * @returns status object associated with this daq.
529  */
530  auto GetStatus() noexcept -> ObservableStatus& override;
531  auto GetStatus() const noexcept -> ObservableStatus const& override;
532  void Poll() override;
533 
534 private:
535  using SourceFile = SourceResolver::SourceFile;
536  void DeferredPoll();
537  struct Scheduled {};
538  struct Transferring {
539  struct Transfer {
541  std::filesystem::path local_path;
542  std::shared_ptr<RsyncAsyncProcessIf> proc;
544  };
545  Transferring(SourceResolver resolver);
546  Transferring(Transferring&&) = default;
547  Transferring& operator=(Transferring&&) = default;
548  ~Transferring() noexcept;
549  bool HasTransfer(SourceFile const&) const noexcept;
550  Transfer* GetTransfer(SourceFile const&) noexcept;
551  void EraseTransfer(SourceFile const&);
552 
553  SourceResolver resolver;
554  std::vector<Transfer> transfers;
555  };
556  struct Merging {
557  Merging() = default;
558  Merging(Merging&&) = default;
559  Merging& operator=(Merging&&) = default;
560  ~Merging() noexcept;
561  void Reset();
562  std::shared_ptr<AsyncProcessIf> merger = {};
563  std::optional<ResourceToken> token = std::nullopt;
564  };
565  struct Completed {};
566 
567  // Poll methods are allowed to perform state transitions.
568  void Poll(Scheduled&);
569 
570  void Poll(Transferring&);
571  void TransferComplete(SourceFile const& source,
572  std::filesystem::path const& local_path,
573  boost::future<int> result) noexcept;
574 
575  void Poll(Merging&);
576  void MergeComplete(boost::future<int> result) noexcept;
577 
578  void Poll(Completed&);
579 
580  /**
581  * Handle JSON message from merger
582  */
583  void HandleMergeMessage(std::string const& line) noexcept;
584 
585  using StateVariant = std::variant<Scheduled, Transferring, Merging, /*Releasing,*/ Completed>;
586  static State MakeState(StateVariant const&);
587  /**
588  * Updates m_state and m_status and is the normal way m_state/m_status is updated (except during
589  * construction).
590  */
591  void SetState(StateVariant s, bool error = false);
592  void SetError(bool error);
593 
594  /**
595  * State persistence
596  */
597  rad::IoExecutor& m_executor;
598  std::unique_ptr<DaqWorkspace> m_workspace;
599  Resources& m_resources;
600  RsyncFactory m_rsync_factory;
601  ProcFactory m_proc_factory;
602  DaqControllerOptions m_options;
603 
604  json::DpSpec const m_dpspec;
605  /**
606  * Path to resulting data product (originates from m_dpspec.target.file_id)
607  */
608  std::filesystem::path m_result;
609 
610  StateVariant m_state_ctx;
611 
612  /**
613  * Current in-memory status.
614  */
615  ObservableStatus m_status;
616  boost::signals2::scoped_connection m_status_connection;
617 
618  /**
619  * A shared pointer value which if valid indicates that Scheduler is alive.
620  * The value itself indicates whether a deferred Poll() has been scheduled but not yet executed.
621  */
622  std::shared_ptr<bool> m_liveness;
623  bool m_stopped;
624  /**
625  * If true no new async activities should be started. This is mainly used to avoid stopping
626  * abruptly when an error occurs and let already started activities complete.
627  */
628  bool m_soft_stop = false;
629  log4cplus::Logger m_logger;
630 
631  /**
632  */
633  std::vector<std::unique_ptr<RsyncAsyncProcessIf>> m_transfers;
634 };
635 
636 } // namespace daq::dpm
637 
638 #endif // #ifndef DAQ_DPM_SCHEDULER_HPP
daq::AsyncProcess class definition
Interface to asynchronous process.
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:165
Observes any status.
Definition: manager.hpp:77
Internal data structure to SchedulerImpl.
Definition: scheduler.hpp:480
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
Definition: scheduler.hpp:490
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Definition: scheduler.hpp:487
Controller for specific DAQ.
Definition: scheduler.hpp:59
virtual auto GetState() const noexcept -> State=0
virtual auto GetId() const noexcept -> std::string const &=0
virtual void Stop()=0
virtual void Poll()=0
virtual void Start()=0
Start/stop operations.
virtual auto GetErrorFlag() const noexcept -> bool=0
virtual auto GetStatus() noexcept -> ObservableStatus &=0
virtual auto GetResult() const noexcept -> std::filesystem::path const &=0
virtual bool IsStopped() const noexcept=0
ResourceToken(ResourceToken &&) noexcept=default
boost::signals2::connection Connect(Signal::slot_type const &slot)
Connect to signal that is emitted when a resource become available.
Definition: scheduler.hpp:214
std::optional< ResourceToken > Acquire() noexcept
Definition: scheduler.hpp:193
boost::signals2::signal< void()> Signal
Signal that emits on changes to resources.
Definition: scheduler.hpp:187
Resource & operator=(Resource const &) noexcept=delete
unsigned GetUsed() const noexcept
Definition: scheduler.hpp:207
unsigned GetLimit() const noexcept
Definition: scheduler.hpp:204
void SetLimit(unsigned new_limit) noexcept
Definition: scheduler.hpp:201
Resource(unsigned limit) noexcept
Definition: scheduler.hpp:189
std::function< std::unique_ptr< DaqController >(std::unique_ptr< DaqWorkspace >, Resources &)> DaqControllerFactory
Definition: scheduler.hpp:349
Schedules asynchronous activities that results in merged Data Product and delivery.
Definition: scheduler.hpp:274
virtual ~Scheduler()
Definition: scheduler.hpp:276
virtual std::vector< std::string > GetQueue() const noexcept=0
Queries current DAQ queue.
virtual std::string QueueDaq(std::string const &dp_spec)=0
Queues DAQ for processing.
virtual void Start()=0
Start/stop operations.
virtual bool IsQueued(std::string const &id) const noexcept=0
Queries if DAQ with ID has been queued before in the current workspace.
virtual void AbortDaq(std::string const &id)=0
Abort merging DAQ identified by id.
virtual Status GetDaqStatus(std::string const &id) const =0
Queries current DAQ status, possibly from last recorded status in workspace.
boost::signals2::signal< void(Status const &)> StatusSignal
Signals.
Definition: scheduler.hpp:340
virtual void Stop()=0
Provides location of fits source file.
Interface to interact with DPM workspace.
Definition: workspace.hpp:98
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
Imposes limits on how many concurrent operations are allowed.
Definition: scheduler.hpp:105
Options for DaqController.
Definition: scheduler.hpp:162
Limited resources.
Definition: scheduler.hpp:231
Options controlling scheduler operations.
Definition: scheduler.hpp:132
Imposes limits on how many concurrent operations are allowed.
Definition: scheduler.hpp:136
Close representation of the JSON structure but with stronger types.
Definition: dpSpec.hpp:30
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:63
State
Observable states of the data acquisition process.
Definition: state.hpp:39
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Merging
DAQ is being merged.
@ Transferring
Input files are being transferred.
Options controlling rsync invocation.
Definition: main.cpp:23
daq::RsyncAsyncProcess and related class declarations.
Declares daq::dpm::SourceResolver.
Contains declaration for Status and ObservableStatus.
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:124
std::shared_ptr< RsyncAsyncProcessIf > proc
Definition: scheduler.hpp:542