10 #ifndef DAQ_DPM_SCHEDULER_HPP
11 #define DAQ_DPM_SCHEDULER_HPP
17 #include <boost/signals2.hpp>
18 #include <log4cplus/logger.h>
66 virtual auto GetId() const noexcept -> std::
string const& = 0;
81 virtual auto
GetResult() const noexcept -> std::filesystem::path const& = 0;
114 unsigned short net_send = 0;
119 unsigned short net_receive = 0;
124 unsigned short merge = 1;
145 unsigned short net_send = 0;
150 unsigned short net_receive = 0;
155 unsigned short merge = 0;
156 } concurrency_limits;
163 std::string merge_bin =
"daqDpmMerge";
164 std::string rsync_bin =
"rsync";
187 using Signal = boost::signals2::signal<void()>;
189 explicit Resource(
unsigned limit) noexcept : m_limit(limit) {
193 std::optional<ResourceToken>
Acquire() noexcept {
194 if (m_limit == 0 || m_used < m_limit) {
214 boost::signals2::connection
Connect(Signal::slot_type
const& slot) {
215 return m_signal.connect(slot);
219 void Release() noexcept;
223 unsigned m_limit = 0;
298 virtual std::string
QueueDaq(std::string
const& dp_spec) = 0;
318 virtual bool IsQueued(std::string
const&
id)
const noexcept = 0;
334 virtual std::vector<std::string>
GetQueue() const noexcept = 0;
341 virtual boost::signals2::connection ConnectStatus(
StatusSignal::slot_type const& slot) = 0;
349 std::function<std::unique_ptr<DaqController>(std::unique_ptr<DaqWorkspace>,
Resources&)>;
362 std::string QueueDaq(std::string
const& dp_spec)
override;
363 void AbortDaq(std::string
const&)
override;
364 bool IsQueued(std::string
const&
id)
const noexcept
override;
365 Status GetDaqStatus(std::string
const&
id)
const override;
366 std::vector<std::string> GetQueue() const noexcept override;
374 boost::signals2::connection ConnectStatus(
StatusSignal::slot_type const& slot) override;
376 void Start() override;
377 void Stop() override;
410 void ActivateFromQueue();
418 void ArchiveCompleted();
427 rad::IoExecutor& m_executor;
435 std::vector<std::
string> GetCandidates() const;
438 Active(std::unique_ptr<DaqController> daq_arg,
ResourceToken token_arg)
439 :
daq(std::move(daq_arg)), token(std::move(token_arg)) {
442 std::unique_ptr<DaqController>
daq;
452 std::vector<Active> m_active;
459 std::vector<std::string> m_queue;
461 Resources m_resources;
464 log4cplus::Logger m_logger;
469 std::shared_ptr<bool> m_liveness;
474 bool m_stopped =
true;
483 std::function<std::unique_ptr<RsyncAsyncProcessIf>(boost::asio::io_context&,
489 using ProcFactory = std::function<std::unique_ptr<AsyncProcessIf>(boost::asio::io_context&,
490 std::vector<std::string>)>;
498 std::unique_ptr<DaqWorkspace> workspace,
504 void Start()
override;
505 void Stop()
override;
506 auto IsStopped() const noexcept ->
bool override;
511 auto
GetId() const noexcept -> std::
string const& override;
523 auto
GetResult() const noexcept -> std::filesystem::path const&
override {
532 void Poll() override;
538 struct Transferring {
542 std::shared_ptr<RsyncAsyncProcessIf>
proc;
546 Transferring(Transferring&&) =
default;
547 Transferring& operator=(Transferring&&) =
default;
548 ~Transferring() noexcept;
549 bool HasTransfer(
SourceFile const&) const noexcept;
558 Merging(Merging&&) =
default;
559 Merging& operator=(Merging&&) =
default;
563 std::optional<ResourceToken> token = std::nullopt;
568 void Poll(Scheduled&);
570 void Poll(Transferring&);
571 void TransferComplete(SourceFile
const& source,
572 std::filesystem::path
const& local_path,
573 boost::future<int> result) noexcept;
576 void MergeComplete(boost::future<int> result) noexcept;
578 void Poll(Completed&);
583 void HandleMergeMessage(std::string
const& line) noexcept;
585 using StateVariant = std::variant<Scheduled, Transferring, Merging,
Completed>;
591 void SetState(StateVariant s,
bool error =
false);
592 void SetError(
bool error);
598 std::unique_ptr<DaqWorkspace> m_workspace;
599 Resources& m_resources;
600 RsyncFactory m_rsync_factory;
601 ProcFactory m_proc_factory;
602 DaqControllerOptions m_options;
608 std::filesystem::path m_result;
610 StateVariant m_state_ctx;
616 boost::signals2::scoped_connection m_status_connection;
622 std::shared_ptr<bool> m_liveness;
628 bool m_soft_stop =
false;
629 log4cplus::Logger m_logger;
633 std::vector<std::unique_ptr<RsyncAsyncProcessIf>> m_transfers;
638 #endif // #ifndef DAQ_DPM_SCHEDULER_HPP