10 #ifndef DAQ_DPM_SCHEDULER_HPP
11 #define DAQ_DPM_SCHEDULER_HPP
17 #include <boost/signals2.hpp>
18 #include <log4cplus/logger.h>
66 virtual auto GetId() const noexcept -> std::
string const& = 0;
81 virtual auto
GetResult() const noexcept -> std::filesystem::path const& = 0;
114 unsigned short net_send = 0;
119 unsigned short net_receive = 0;
124 unsigned short merge = 1;
145 unsigned short net_send = 0;
150 unsigned short net_receive = 0;
155 unsigned short merge = 0;
156 } concurrency_limits;
163 std::string merge_bin =
"daqDpmMerge";
164 std::string rsync_bin =
"rsync";
187 using Signal = boost::signals2::signal<void()>;
189 explicit Resource(
unsigned limit) noexcept : m_limit(limit) {
193 std::optional<ResourceToken>
Acquire() noexcept {
194 if (m_limit == 0 || m_used < m_limit) {
214 boost::signals2::connection
Connect(Signal::slot_type
const& slot) {
215 return m_signal.connect(slot);
219 void Release() noexcept;
223 unsigned m_limit = 0;
298 virtual std::string
QueueDaq(std::string
const& dp_spec) = 0;
318 virtual bool IsQueued(std::string
const&
id)
const noexcept = 0;
334 virtual std::vector<std::string>
GetQueue() const noexcept = 0;
341 virtual boost::signals2::connection ConnectStatus(
StatusSignal::slot_type const& slot) = 0;
349 std::function<std::unique_ptr<DaqController>(std::unique_ptr<DaqWorkspace>,
Resources&)>;
362 std::string QueueDaq(std::string
const& dp_spec)
override;
363 void AbortDaq(std::string
const&)
override;
364 bool IsQueued(std::string
const&
id)
const noexcept
override;
365 Status GetDaqStatus(std::string
const&
id)
const override;
366 std::vector<std::string> GetQueue() const noexcept override;
374 boost::signals2::connection ConnectStatus(
StatusSignal::slot_type const& slot) override;
376 void Start() override;
377 void Stop() override;
410 void ActivateFromQueue();
418 void ArchiveCompleted();
427 rad::IoExecutor& m_executor;
435 std::vector<std::
string> GetCandidates() const;
438 Active(std::unique_ptr<DaqController> daq_arg,
ResourceToken token_arg)
439 :
daq(std::move(daq_arg)), token(std::move(token_arg)) {
442 std::unique_ptr<DaqController>
daq;
452 std::vector<Active> m_active;
459 std::vector<std::string> m_queue;
461 Resources m_resources;
464 log4cplus::Logger m_logger;
469 std::shared_ptr<bool> m_liveness;
474 bool m_stopped =
true;
483 std::function<std::unique_ptr<RsyncAsyncProcessIf>(boost::asio::io_context&,
489 using ProcFactory = std::function<std::unique_ptr<AsyncProcessIf>(
490 boost::asio::io_context&, std::vector<std::string>
const&)>;
498 std::unique_ptr<DaqWorkspace> workspace,
504 void Start()
override;
505 void Stop()
override;
506 auto IsStopped() const noexcept ->
bool override;
511 auto
GetId() const noexcept -> std::
string const& override;
523 auto
GetResult() const noexcept -> std::filesystem::path const&
override {
532 void Poll() override;
542 std::shared_ptr<RsyncAsyncProcessIf>
proc;
549 bool HasTransfer(
SourceFile const&) const noexcept;
559 Merging& operator=(Merging&&) =
default;
563 std::optional<ResourceToken> token = std::nullopt;
568 void Poll(Scheduled&);
570 void Poll(Transferring&);
571 void TransferComplete(SourceFile
const& source,
572 std::filesystem::path
const& local_path,
573 boost::future<int> result) noexcept;
576 void MergeComplete(boost::future<int> result) noexcept;
578 void Poll(Completed&);
583 void HandleMergeMessage(std::string
const& line) noexcept;
591 void SetState(StateVariant s,
bool error =
false);
592 void SetError(
bool error);
598 std::unique_ptr<DaqWorkspace> m_workspace;
599 Resources& m_resources;
600 RsyncFactory m_rsync_factory;
601 ProcFactory m_proc_factory;
602 DaqControllerOptions m_options;
608 std::filesystem::path m_result;
610 StateVariant m_state_ctx;
616 boost::signals2::scoped_connection m_status_connection;
622 std::shared_ptr<bool> m_liveness;
628 bool m_soft_stop =
false;
629 log4cplus::Logger m_logger;
633 std::vector<std::unique_ptr<RsyncAsyncProcessIf>> m_transfers;
daq::AsyncProcess class definition
Interface to asynchronous process.
Stores data acquisition status and allows subscription to status changes.
Internal data structure to SchedulerImpl.
std::function< std::unique_ptr< AsyncProcessIf >(boost::asio::io_context &, std::vector< std::string > const &)> ProcFactory
std::function< std::unique_ptr< RsyncAsyncProcessIf >(boost::asio::io_context &, std::string const &, std::string const &, RsyncOptions const &, RsyncAsyncProcess::DryRun)> RsyncFactory
Controller for specific DAQ.
virtual auto GetState() const noexcept -> State=0
virtual auto GetId() const noexcept -> std::string const &=0
virtual void Start()=0
Start/stop operations.
virtual auto GetErrorFlag() const noexcept -> bool=0
virtual auto GetStatus() noexcept -> ObservableStatus &=0
virtual auto GetResult() const noexcept -> std::filesystem::path const &=0
virtual bool IsStopped() const noexcept=0
ResourceToken(ResourceToken &&) noexcept=default
boost::signals2::connection Connect(Signal::slot_type const &slot)
Connect to signal that is emitted when a resource become available.
std::optional< ResourceToken > Acquire() noexcept
boost::signals2::signal< void()> Signal
Signal that emits on changes to resources.
Resource & operator=(Resource const &) noexcept=delete
unsigned GetUsed() const noexcept
unsigned GetLimit() const noexcept
void SetLimit(unsigned new_limit) noexcept
Resource(unsigned limit) noexcept
std::function< std::unique_ptr< DaqController >(std::unique_ptr< DaqWorkspace >, Resources &)> DaqControllerFactory
Schedules asynchronous activities that results in merged Data Product and delivery.
virtual std::vector< std::string > GetQueue() const noexcept=0
Queries current DAQ queue.
virtual std::string QueueDaq(std::string const &dp_spec)=0
Queues DAQ for processing.
virtual void Start()=0
Start/stop operations.
virtual bool IsQueued(std::string const &id) const noexcept=0
Queries if DAQ with ID has been queued before in the current workspace.
virtual void AbortDaq(std::string const &id)=0
Abort merging DAQ identified by id.
virtual Status GetDaqStatus(std::string const &id) const =0
Queries current DAQ status, possibly from last recorded status in workspace.
boost::signals2::signal< void(Status const &)> StatusSignal
Signals.
Provides location of fits source file.
Interface to interact with DPM workspace.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Imposes limits on how many concurrent operations are allowed.
Options for DaqController.
Options controlling scheduler operations.
Imposes limits on how many concurrent operations are allowed.
Close representation of the JSON structure but with stronger types.
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
State
Observable states of the data acquisition process.
@ Completed
Completed DAQ.
@ Scheduled
daq is acknowledged by dpm and is scheduled for merging (i.e.
@ Merging
DAQ is being merged.
@ Transferring
Input files are being transferred.
Options controlling rsync invocation.
daq::RsyncAsyncProcess and related class declarations.
Declares daq::dpm::SourceResolver.
Contains declaration for Status and ObservableStatus.
Non observable status object that keeps stores status of data acquisition.
std::shared_ptr< RsyncAsyncProcessIf > proc
std::filesystem::path local_path