ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
dpmService.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_dpm
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::dpm::DpmService definition.
9  */
10 #include <daq/dpm/dpmService.hpp>
11 
12 #include <exception>
13 
14 #include <boost/asio/io_context.hpp>
15 #include <boost/throw_exception.hpp>
16 
17 #include <fmt/format.h>
18 #include <log4cplus/loggingmacros.h>
19 
20 #include <daq/conversion.hpp>
21 #include <daq/error/report.hpp>
22 
23 namespace daq::dpm {
24 
26  elt::mal::Mal& mal,
27  Workspace& workspace,
28  Scheduler& scheduler)
29  : m_executor(executor)
30  , m_mal(mal)
31  , m_workspace(workspace)
32  , m_scheduler(scheduler)
33  , m_work_guard(m_executor.get_io_context().get_executor())
34  , m_sigset(executor.get_io_context(), SIGINT, SIGTERM)
35  , m_logger(log4cplus::Logger::getInstance("dpm.service")) {
36  InitiateSignalHandler();
37 }
38 
39 boost::future<std::shared_ptr<::daqif::StorageStatus>> DpmService::QueryStorageStatus() {
40  try {
41  // Note: QueryStorageStatus() is thread-safe
42  auto status = m_workspace.QueryStorageStatus();
43 
44  std::shared_ptr<daqif::StorageStatus> rep = m_mal.createDataEntity<daqif::StorageStatus>();
45  rep->setCapacity(status.capacity);
46  rep->setFree(status.free);
47  rep->setAvailable(status.available);
48 
49  return boost::make_ready_future<std::shared_ptr<daqif::StorageStatus>>(rep);
50  } catch (...) {
51  error::NestedExceptionReporter r(std::current_exception());
52  return boost::make_exceptional_future<std::shared_ptr<daqif::StorageStatus>>(
53  daqif::Exception(r.Str()));
54  }
55 }
56 
57 boost::future<std::string> DpmService::Exit() {
58  // Removing guard which will cause asio::io_context::run to finish when it runs out of pending
59  // work.
60  // Note: If e.g. timers are added continously then there will always be an outstanding operation
61  // so it won't complete.
62  LOG4CPLUS_INFO(m_logger, "Application termination requested with Exit()");
63  return boost::async(m_executor, [this]() mutable -> std::string {
64  HandleExit();
65  return "OK";
66  });
67 }
68 
69 boost::future<std::string> DpmService::GetVersion() {
70  return boost::make_ready_future(std::string(VERSION));
71 }
72 
73 boost::future<std::shared_ptr<::daqif::DaqReply>>
74 DpmService::QueueDaq(const std::string& specification) {
75  LOG4CPLUS_DEBUG(m_logger, fmt::format("QueueDaq() Queuing new DAQ"));
76  using V = ::daqif::DaqReply;
77  using R = std::shared_ptr<V>;
78  return boost::async(m_executor, [specification, this]() mutable -> R {
79  try {
80  auto id = m_scheduler.QueueDaq(specification);
81  try {
82  auto rep = m_mal.createDataEntity<V>();
83  rep->setId(id);
84  rep->setError(false);
85  return rep;
86  } catch (...) {
87  error::NestedExceptionReporter r(std::current_exception());
88  BOOST_THROW_EXCEPTION(daqif::DaqException(id, r.Str()));
89  }
90  } catch (daqif::DaqException const& e) {
92  LOG4CPLUS_ERROR(m_logger, "QueueDaq() failed with ICD error:\n" << r.Str());
93  throw;
94  } catch (...) {
95  error::NestedExceptionReporter r(std::current_exception());
96  LOG4CPLUS_ERROR(m_logger, "QueueDaq() failed with following error(s):\n" << r);
97  BOOST_THROW_EXCEPTION(daqif::DaqException("", r.Str()));
98  }
99  });
100 }
101 
102 boost::future<std::shared_ptr<::daqif::DaqReply>> DpmService::AbortDaq(const std::string& id) {
103  LOG4CPLUS_INFO(m_logger, fmt::format("AbortDaq({}): Handling request", id));
104  using V = ::daqif::DaqReply;
105  using R = std::shared_ptr<V>;
106  return boost::async(m_executor, [id, this]() mutable -> R {
107  try {
108  m_scheduler.AbortDaq(id);
109  try {
110  auto rep = m_mal.createDataEntity<V>();
111  rep->setId(id);
112  rep->setError(false);
113  return rep;
114  } catch (...) {
115  error::NestedExceptionReporter r(std::current_exception());
116  throw daqif::DaqException(id, r.Str());
117  }
118  } catch (daqif::DaqException const& e) {
120  LOG4CPLUS_ERROR(m_logger, "AbortDaq() failed with following error(s):\n" << r);
121  throw;
122  } catch (...) {
123  error::NestedExceptionReporter r(std::current_exception());
124  LOG4CPLUS_ERROR(m_logger, "AbortDaq() failed with following error(s):\n" << r);
125  BOOST_THROW_EXCEPTION(daqif::DaqException("", r.Str()));
126  }
127  });
128 }
129 
130 boost::future<std::shared_ptr<::daqif::DaqStatus>> DpmService::GetDaqStatus(const std::string& id) {
131  LOG4CPLUS_INFO(m_logger, fmt::format("GetDaqStatus({}): Handling request", id));
132  using V = ::daqif::DaqStatus;
133  using R = std::shared_ptr<V>;
134  return boost::async(m_executor, [id, this]() mutable -> R {
135  try {
136  return GetDaqStatusSync(id);
137  } catch (daqif::DaqException const& e) {
139  LOG4CPLUS_ERROR(m_logger, "GetDaqStatus() failed with following error(s):\n" << r);
140  throw;
141  } catch (...) {
142  error::NestedExceptionReporter r(std::current_exception());
143  LOG4CPLUS_ERROR(m_logger, "GetDaqStatus() failed with following error(s):\n" << r);
144  BOOST_THROW_EXCEPTION(daqif::DaqException(id, r.Str()));
145  }
146  });
147 }
148 
149 boost::future<std::vector<std::shared_ptr<::daqif::DaqStatus>>> DpmService::GetActiveDaqs() {
150  LOG4CPLUS_INFO(m_logger, "GetActiveDaqs(): Handling request");
151  using V = std::shared_ptr<::daqif::DaqStatus>;
152  using R = std::vector<V>;
153  return boost::async(m_executor, [this]() mutable -> R {
154  try {
155  auto ids = m_scheduler.GetQueue();
156  R statuses;
157  for (auto const& id : ids) {
158  statuses.emplace_back(GetDaqStatusSync(id));
159  }
160  return statuses;
161  } catch (daqif::DaqException const& e) {
163  LOG4CPLUS_ERROR(m_logger, "GetActiveDaqs() failed with following error(s):\n" << r);
164  throw;
165  } catch (...) {
166  error::NestedExceptionReporter r(std::current_exception());
167  LOG4CPLUS_ERROR(m_logger, "GetActiveDaqs() failed with following error(s):\n" << r);
168  BOOST_THROW_EXCEPTION(daqif::DaqException("", r.Str()));
169  }
170  });
171 }
172 
173 void DpmService::InitiateSignalHandler() {
174  m_sigset.async_wait([&](boost::system::error_code const& ec, int sig_num) {
175  if (ec) {
176  return;
177  }
178  LOG4CPLUS_INFO(m_logger,
179  fmt::format("Application termination requested with signal {}", sig_num));
180  HandleExit();
181  });
182 }
183 
184 void DpmService::HandleExit() {
185  LOG4CPLUS_TRACE(m_logger, "HandleExit(): Application termination has been requested");
186  // If we still have a work-guard we try a nice shutdown, if there's no work guard
187  if (m_work_guard.owns_work()) {
188  // Reset work guard and cancel pending operations
189  m_work_guard.reset();
190  m_scheduler.Stop();
191  m_sigset.cancel();
192  m_sigset.clear();
193  } else {
194  LOG4CPLUS_INFO(m_logger,
195  "Application termination already requested before. Stopping immediately!");
196  m_executor.get_io_context().stop();
197  }
198 }
199 
200 std::shared_ptr<daqif::DaqStatus> DpmService::GetDaqStatusSync(const std::string& id) {
201  LOG4CPLUS_INFO(m_logger, fmt::format("GetDaqStatusSync({}): Handling request", id));
202  using V = ::daqif::DaqStatus;
203  try {
204  auto status = m_scheduler.GetDaqStatus(id);
205  auto rep = m_mal.createDataEntity<V>();
206  *rep << status;
207  return rep;
208  } catch (...) {
209  error::NestedExceptionReporter r(std::current_exception());
210  BOOST_THROW_EXCEPTION(daqif::DaqException(id, r.Str()));
211  }
212 }
213 
214 } // namespace daq::dpm
rad::IoExecutor::get_io_context
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Definition: ioExecutor.hpp:41
daq::dpm::Workspace::QueryStorageStatus
virtual auto QueryStorageStatus() const -> std::filesystem::space_info=0
Queries available storage for workspace.
daq::dpm::Scheduler::GetQueue
virtual std::vector< std::string > GetQueue() const noexcept=0
Queries current DAQ queue.
daq::dpm::Workspace
Interface to interact with DPM workspace.
Definition: workspace.hpp:98
rad::IoExecutor
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
daq::error::NestedExceptionReporter::Str
std::string Str() const
Convenience function for constructing a std::string from the exception.
Definition: report.cpp:97
daq::dpm::Scheduler::AbortDaq
virtual void AbortDaq(std::string const &id)=0
Abort merging DAQ identified by id.
report.hpp
conversion.hpp
Contains support functions for daqif.
daq::dpm::DpmService::QueueDaq
boost::future< std::shared_ptr<::daqif::DaqReply > > QueueDaq(const std::string &specification) override
Definition: dpmService.cpp:74
daq::dpm::DpmService::GetDaqStatus
boost::future< std::shared_ptr<::daqif::DaqStatus > > GetDaqStatus(const std::string &id) override
Definition: dpmService.cpp:130
daq::error::NestedExceptionReporter
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
daq::dpm::DpmService::Exit
boost::future< std::string > Exit() override
Cancels any started merges and terminates application.
Definition: dpmService.cpp:57
daq::dpm::Scheduler::QueueDaq
virtual std::string QueueDaq(std::string const &dp_spec)=0
Queues DAQ for processing.
daq::dpm::Scheduler
Schedules asynchronous activities that results in merged Data Product and delivery.
Definition: scheduler.hpp:274
daq::dpm::Scheduler::Stop
virtual void Stop()=0
daq::dpm
Definition: testDpSpec.cpp:16
daq::dpm::DpmService::AbortDaq
boost::future< std::shared_ptr<::daqif::DaqReply > > AbortDaq(const std::string &id) override
Definition: dpmService.cpp:102
daq::dpm::DpmService::GetActiveDaqs
boost::future< std::vector< std::shared_ptr<::daqif::DaqStatus > > > GetActiveDaqs() override
Definition: dpmService.cpp:149
dpmService.hpp
daq::dpm::DpmService implements MAL services daqif::DpmControl and daqif::DpmDaqControl.
elt::mal
Definition: dpmClient.hpp:25
daq::dpm::DpmService::GetVersion
boost::future< std::string > GetVersion() override
Cancels any started merges and terminates application.
Definition: dpmService.cpp:69
daq::dpm::DpmService::DpmService
DpmService(rad::IoExecutor &executor, elt::mal::Mal &mal, Workspace &workspace, Scheduler &scheduler)
Construct replier.
Definition: dpmService.cpp:25
daq::dpm::Scheduler::GetDaqStatus
virtual Status GetDaqStatus(std::string const &id) const =0
Queries current DAQ status, possibly from last recorded status in workspace.
daq::dpm::DpmService::QueryStorageStatus
boost::future< std::shared_ptr<::daqif::StorageStatus > > QueryStorageStatus() override
Cancels any started merges and terminates application.
Definition: dpmService.cpp:39