ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
dpmClient.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_libdaq_test
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief `daq::DpmClient`
7  */
8 #include <daq/dpmClient.hpp>
9 
10 #include <Daqif.hpp>
11 #include <mal/Mal.hpp>
12 #include <mal/rr/qos/ReplyTime.hpp>
13 
14 #include <daqif/subscription.hpp>
15 #include <daq/conversion.hpp>
16 
17 namespace daq {
18 
19 using DaqReplyPtr = std::shared_ptr<daqif::DaqReply>;
20 
22  std::shared_ptr<bool> alive = std::make_shared<bool>(true);
25 };
26 
27 DpmClientImpl::DpmClientImpl(boost::asio::io_context& io_ctx, mal::Mal& mal, DpmClientParams params)
28  : m_io_ctx(io_ctx), m_executor(m_io_ctx), m_mal(mal), m_params(std::move(params)) {
29  m_dpmif = m_mal.getClient<daqif::DpmDaqControlAsync>(
30  daqif::MakeServiceUri(m_params.rr_uri, "daq"),
31  {std::make_shared<elt::mal::rr::qos::ReplyTime>(params.timeout)},
32  {});
33  assert(m_dpmif);
34 
35  m_subs = std::make_unique<Subscriptions>();
36  // Add subscription to status notifications, which will invoked from unknown threads,
37  // so we dispatch to asio instead and emit signal safely there.
38  m_subs->status = daqif::MakeSubscription<daqif::DaqStatus>(
39  m_mal,
40  daqif::MakeServiceUri(m_params.ps_uri, "daq/status"),
41  [weak = std::weak_ptr<bool>(m_subs->alive), this](auto& sub, auto const& event) {
42  auto shared = weak.lock();
43  if (!shared) {
44  // Abandoned
45  return;
46  }
47  auto const& sample = event.getData();
48  Status status(sample->getId(), sample->getFileId());
49  status << *sample;
50  // Dispatch to main thread
51  m_io_ctx.post([status = std::move(status), weak, this] {
52  auto shared = weak.lock();
53  if (!shared) {
54  // Abandoned
55  return;
56  }
57  m_status_signal(status);
58  });
59  });
60  // Add subscription to storage notifications, which will invoked from unknown threads,
61  // so we dispatch to asio instead and emit signal safely there.
62  m_subs->storage = daqif::MakeSubscription<daqif::StorageStatus>(
63  m_mal,
64  daqif::MakeServiceUri(m_params.ps_uri, "dpm/storage"),
65  [weak = std::weak_ptr<bool>(m_subs->alive), this](auto& sub, auto const& event) {
66  auto shared = weak.lock();
67  if (!shared) {
68  // Abandoned
69  return;
70  }
71  auto const& sample = event.getData();
72  std::filesystem::space_info space = {};
73  space << *sample;
74  // Dispatch to main thread
75  m_io_ctx.post([space, weak, this] {
76  auto shared = weak.lock();
77  if (!shared) {
78  // Abandoned
79  return;
80  }
81  m_storage_signal(space);
82  });
83  });
84 }
85 
86 DpmClientImpl::~DpmClientImpl() = default;
87 
88 auto DpmClientImpl::ScheduleAsync(std::string const& spec) -> boost::future<State> {
89  return m_dpmif->QueueDaq(spec).then(m_executor, [&](boost::future<DaqReplyPtr> f) {
90  auto reply = f.get();
91  // If it doesn't throw it was successful, until status change is published we can assume
92  // state is Scheduled.
93  return State::Scheduled;
94  });
95 }
96 
97 auto DpmClientImpl::AbortAsync(std::string const& id) -> boost::future<State> {
98  return m_dpmif->AbortDaq(id).then(m_executor, [&](boost::future<DaqReplyPtr> f) {
99  auto reply = f.get();
100  // If it doesn't throw it was successful, until status change is published we can assume
101  // state is Aborted.
102  return State::Aborted;
103  });
104 }
105 
106 auto DpmClientImpl::ConnectStatusSignal(StatusSignal::slot_type const& slot)
107  -> boost::signals2::connection {
108  return m_status_signal.connect(slot);
109 }
110 
111 auto DpmClientImpl::ConnectStorageSignal(StorageSignal::slot_type const& slot)
112  -> boost::signals2::connection {
113  return m_storage_signal.connect(slot);
114 }
115 
116 } // namespace daq
dpmClient.hpp
daq::DpmClient
daq::DpmClientImpl::Subscriptions::status
daqif::Subscription< daqif::DaqStatus > status
Definition: dpmClient.cpp:23
daqif::Subscription< daqif::DaqStatus >
conversion.hpp
Contains support functions for daqif.
daq
Definition: asyncProcess.cpp:15
daq::DpmClientParams::timeout
std::chrono::seconds timeout
Definition: dpmClient.hpp:77
daqif::MakeServiceUri
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
Definition: uri.cpp:19
daq::DpmClientImpl::Subscriptions::storage
daqif::Subscription< daqif::StorageStatus > storage
Definition: dpmClient.cpp:24
daq::DpmClientImpl::Subscriptions
Definition: dpmClient.cpp:21
daq::DaqReplyPtr
std::shared_ptr< daqif::DaqReply > DaqReplyPtr
Definition: dpmClient.cpp:19
daq::DpmClientParams::ps_uri
std::string ps_uri
Definition: dpmClient.hpp:76
daq::Status
Non observable status object that keeps stores status of data acquisition.
Definition: status.hpp:120
daq::DpmClientParams
Connection parameters for DPM.
Definition: dpmClient.hpp:74
daq::DpmClientImpl::Subscriptions::alive
std::shared_ptr< bool > alive
Definition: dpmClient.cpp:22
elt::mal
Definition: dpmClient.hpp:25
daq::DpmClientParams::rr_uri
std::string rr_uri
Definition: dpmClient.hpp:75
subscription.hpp
Contains URI support functions for daqif.
daq::DpmClientImpl::DpmClientImpl
DpmClientImpl(boost::asio::io_context &io_ctx, mal::Mal &mal, DpmClientParams params)
Definition: dpmClient.cpp:27