11 #include <mal/Mal.hpp>
12 #include <mal/rr/qos/ReplyTime.hpp>
22 std::shared_ptr<bool>
alive = std::make_shared<bool>(
true);
28 : m_io_ctx(io_ctx), m_executor(m_io_ctx), m_mal(
mal), m_params(std::move(params)) {
29 m_dpmif = m_mal.getClient<daqif::DpmDaqControlAsync>(
31 {std::make_shared<elt::mal::rr::qos::ReplyTime>(params.
timeout)},
35 m_subs = std::make_unique<Subscriptions>();
38 m_subs->status = daqif::MakeSubscription<daqif::DaqStatus>(
41 [weak = std::weak_ptr<bool>(m_subs->alive),
this](
auto& sub,
auto const& event) {
42 auto shared = weak.lock();
47 auto const& sample = event.getData();
48 Status status(sample->getId(), sample->getFileId());
51 m_io_ctx.post([status = std::move(status), weak,
this] {
52 auto shared = weak.lock();
57 m_status_signal(status);
62 m_subs->storage = daqif::MakeSubscription<daqif::StorageStatus>(
65 [weak = std::weak_ptr<bool>(m_subs->alive),
this](
auto& sub,
auto const& event) {
66 auto shared = weak.lock();
71 auto const& sample = event.getData();
72 std::filesystem::space_info space = {};
75 m_io_ctx.post([space, weak,
this] {
76 auto shared = weak.lock();
81 m_storage_signal(space);
86 DpmClientImpl::~DpmClientImpl() =
default;
88 auto DpmClientImpl::ScheduleAsync(std::string
const& spec) -> boost::future<State> {
89 return m_dpmif->QueueDaq(spec).then(m_executor, [&](boost::future<DaqReplyPtr> f) {
93 return State::Scheduled;
97 auto DpmClientImpl::AbortAsync(std::string
const&
id) -> boost::future<State> {
98 return m_dpmif->AbortDaq(
id).then(m_executor, [&](boost::future<DaqReplyPtr> f) {
102 return State::Aborted;
106 auto DpmClientImpl::ConnectStatusSignal(StatusSignal::slot_type
const& slot)
107 -> boost::signals2::connection {
108 return m_status_signal.connect(slot);
111 auto DpmClientImpl::ConnectStorageSignal(StorageSignal::slot_type
const& slot)
112 -> boost::signals2::connection {
113 return m_storage_signal.connect(slot);