ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
dpmService.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_dpm
4  * @copyright (c) Copyright ESO 2022
5  * All Rights Reserved
6  * ESO (eso.org) is an Intergovernmental Organisation, and therefore special legal conditions apply.
7  *
8  * @brief daq::dpm::DpmService definition.
9  */
10 #include <daq/dpm/dpmService.hpp>
11 
12 #include <exception>
13 
14 #include <boost/asio/io_context.hpp>
15 #include <boost/throw_exception.hpp>
16 
17 #include <fmt/format.h>
18 #include <log4cplus/loggingmacros.h>
19 
20 #include <daq/conversion.hpp>
21 #include <daq/error/report.hpp>
22 #include <daq/dpm/config.hpp>
23 
24 namespace daq::dpm {
25 
27  elt::mal::Mal& mal,
28  Workspace& workspace,
29  Scheduler& scheduler)
30  : m_executor(executor)
31  , m_mal(mal)
32  , m_workspace(workspace)
33  , m_scheduler(scheduler)
34  , m_work_guard(m_executor.get_io_context().get_executor())
35  , m_sigset(executor.get_io_context(), SIGINT, SIGTERM)
36  , m_logger(log4cplus::Logger::getInstance(LOGGER_NAME)) {
37  InitiateSignalHandler();
38 }
39 
40 boost::future<std::shared_ptr<::daqif::StorageStatus>> DpmService::QueryStorageStatus() {
41  try {
42  // Note: QueryStorageStatus() is thread-safe
43  auto status = m_workspace.QueryStorageStatus();
44 
45  std::shared_ptr<daqif::StorageStatus> rep = m_mal.createDataEntity<daqif::StorageStatus>();
46  rep->setCapacity(status.capacity);
47  rep->setFree(status.free);
48  rep->setAvailable(status.available);
49 
50  return boost::make_ready_future<std::shared_ptr<daqif::StorageStatus>>(rep);
51  } catch (...) {
52  error::NestedExceptionReporter r(std::current_exception());
53  return boost::make_exceptional_future<std::shared_ptr<daqif::StorageStatus>>(
54  daqif::Exception(r.Str()));
55  }
56 }
57 
58 boost::future<std::string> DpmService::Exit() {
59  // Removing guard which will cause asio::io_context::run to finish when it runs out of pending
60  // work.
61  // Note: If e.g. timers are added continously then there will always be an outstanding operation
62  // so it won't complete.
63  LOG4CPLUS_INFO(m_logger, "Application termination requested with Exit()");
64  return boost::async(m_executor, [this]() mutable -> std::string {
65  HandleExit();
66  return "OK";
67  });
68 }
69 
70 boost::future<std::string> DpmService::GetVersion() {
71  return boost::make_ready_future(std::string(VERSION));
72 }
73 
74 boost::future<std::shared_ptr<::daqif::DaqReply>>
75 DpmService::QueueDaq(const std::string& specification) {
76  LOG4CPLUS_DEBUG(m_logger, fmt::format("QueueDaq() Queuing new DAQ"));
77  using V = ::daqif::DaqReply;
78  using R = std::shared_ptr<V>;
79  return boost::async(m_executor, [specification, this]() mutable -> R {
80  try {
81  auto id = m_scheduler.QueueDaq(specification);
82  try {
83  auto rep = m_mal.createDataEntity<V>();
84  rep->setId(id);
85  rep->setError(false);
86  return rep;
87  } catch (...) {
88  error::NestedExceptionReporter r(std::current_exception());
89  BOOST_THROW_EXCEPTION(daqif::DaqException(id, r.Str()));
90  }
91  } catch (daqif::DaqException const& e) {
93  LOG4CPLUS_ERROR(m_logger, "QueueDaq() failed with ICD error:\n" << r.Str());
94  throw;
95  } catch (...) {
96  error::NestedExceptionReporter r(std::current_exception());
97  LOG4CPLUS_ERROR(m_logger, "QueueDaq() failed with following error(s):\n" << r);
98  BOOST_THROW_EXCEPTION(daqif::DaqException("", r.Str()));
99  }
100  });
101 }
102 
103 boost::future<std::shared_ptr<::daqif::DaqReply>> DpmService::AbortDaq(const std::string& id) {
104  LOG4CPLUS_INFO(m_logger, fmt::format("AbortDaq({}): Handling request", id));
105  using V = ::daqif::DaqReply;
106  using R = std::shared_ptr<V>;
107  return boost::async(m_executor, [id, this]() mutable -> R {
108  try {
109  m_scheduler.AbortDaq(id);
110  try {
111  auto rep = m_mal.createDataEntity<V>();
112  rep->setId(id);
113  rep->setError(false);
114  return rep;
115  } catch (...) {
116  error::NestedExceptionReporter r(std::current_exception());
117  throw daqif::DaqException(id, r.Str());
118  }
119  } catch (daqif::DaqException const& e) {
121  LOG4CPLUS_ERROR(m_logger, "AbortDaq() failed with following error(s):\n" << r);
122  throw;
123  } catch (...) {
124  error::NestedExceptionReporter r(std::current_exception());
125  LOG4CPLUS_ERROR(m_logger, "AbortDaq() failed with following error(s):\n" << r);
126  BOOST_THROW_EXCEPTION(daqif::DaqException("", r.Str()));
127  }
128  });
129 }
130 
131 boost::future<std::shared_ptr<::daqif::DaqStatus>> DpmService::GetDaqStatus(const std::string& id) {
132  LOG4CPLUS_INFO(m_logger, fmt::format("GetDaqStatus({}): Handling request", id));
133  using V = ::daqif::DaqStatus;
134  using R = std::shared_ptr<V>;
135  return boost::async(m_executor, [id, this]() mutable -> R {
136  try {
137  return GetDaqStatusSync(id);
138  } catch (daqif::DaqException const& e) {
140  LOG4CPLUS_ERROR(m_logger, "GetDaqStatus() failed with following error(s):\n" << r);
141  throw;
142  } catch (...) {
143  error::NestedExceptionReporter r(std::current_exception());
144  LOG4CPLUS_ERROR(m_logger, "GetDaqStatus() failed with following error(s):\n" << r);
145  BOOST_THROW_EXCEPTION(daqif::DaqException(id, r.Str()));
146  }
147  });
148 }
149 
150 boost::future<std::vector<std::shared_ptr<::daqif::DaqStatus>>> DpmService::GetActiveDaqs() {
151  LOG4CPLUS_INFO(m_logger, "GetActiveDaqs(): Handling request");
152  using V = std::shared_ptr<::daqif::DaqStatus>;
153  using R = std::vector<V>;
154  return boost::async(m_executor, [this]() mutable -> R {
155  try {
156  auto ids = m_scheduler.GetQueue();
157  R statuses;
158  for (auto const& id : ids) {
159  statuses.emplace_back(GetDaqStatusSync(id));
160  }
161  return statuses;
162  } catch (daqif::DaqException const& e) {
164  LOG4CPLUS_ERROR(m_logger, "GetActiveDaqs() failed with following error(s):\n" << r);
165  throw;
166  } catch (...) {
167  error::NestedExceptionReporter r(std::current_exception());
168  LOG4CPLUS_ERROR(m_logger, "GetActiveDaqs() failed with following error(s):\n" << r);
169  BOOST_THROW_EXCEPTION(daqif::DaqException("", r.Str()));
170  }
171  });
172 }
173 
174 void DpmService::InitiateSignalHandler() {
175  m_sigset.async_wait([&](boost::system::error_code const& ec, int sig_num) {
176  if (ec) {
177  return;
178  }
179  LOG4CPLUS_INFO(m_logger,
180  fmt::format("Application termination requested with signal {}", sig_num));
181  HandleExit();
182  });
183 }
184 
185 void DpmService::HandleExit() {
186  LOG4CPLUS_TRACE(m_logger, "HandleExit(): Application termination has been requested");
187  // If we still have a work-guard we try a nice shutdown, if there's no work guard
188  if (m_work_guard.owns_work()) {
189  // Reset work guard and cancel pending operations
190  m_work_guard.reset();
191  m_scheduler.Stop();
192  m_sigset.cancel();
193  m_sigset.clear();
194  } else {
195  LOG4CPLUS_INFO(m_logger,
196  "Application termination already requested before. Stopping immediately!");
197  m_executor.get_io_context().stop();
198  }
199 }
200 
201 std::shared_ptr<daqif::DaqStatus> DpmService::GetDaqStatusSync(const std::string& id) {
202  LOG4CPLUS_INFO(m_logger, fmt::format("GetDaqStatusSync({}): Handling request", id));
203  using V = ::daqif::DaqStatus;
204  try {
205  auto status = m_scheduler.GetDaqStatus(id);
206  auto rep = m_mal.createDataEntity<V>();
207  *rep << status;
208  return rep;
209  } catch (...) {
210  error::NestedExceptionReporter r(std::current_exception());
211  BOOST_THROW_EXCEPTION(daqif::DaqException(id, r.Str()));
212  }
213 }
214 
215 } // namespace daq::dpm
boost::future< std::shared_ptr<::daqif::DaqReply > > QueueDaq(const std::string &specification) override
Definition: dpmService.cpp:75
DpmService(rad::IoExecutor &executor, elt::mal::Mal &mal, Workspace &workspace, Scheduler &scheduler)
Construct replier.
Definition: dpmService.cpp:26
boost::future< std::string > GetVersion() override
Cancels any started merges and terminates application.
Definition: dpmService.cpp:70
boost::future< std::shared_ptr<::daqif::StorageStatus > > QueryStorageStatus() override
Cancels any started merges and terminates application.
Definition: dpmService.cpp:40
boost::future< std::string > Exit() override
Cancels any started merges and terminates application.
Definition: dpmService.cpp:58
boost::future< std::vector< std::shared_ptr<::daqif::DaqStatus > > > GetActiveDaqs() override
Definition: dpmService.cpp:150
boost::future< std::shared_ptr<::daqif::DaqReply > > AbortDaq(const std::string &id) override
Definition: dpmService.cpp:103
boost::future< std::shared_ptr<::daqif::DaqStatus > > GetDaqStatus(const std::string &id) override
Definition: dpmService.cpp:131
Schedules asynchronous activities that results in merged Data Product and delivery.
Definition: scheduler.hpp:274
virtual std::vector< std::string > GetQueue() const noexcept=0
Queries current DAQ queue.
virtual std::string QueueDaq(std::string const &dp_spec)=0
Queues DAQ for processing.
virtual void AbortDaq(std::string const &id)=0
Abort merging DAQ identified by id.
virtual Status GetDaqStatus(std::string const &id) const =0
Queries current DAQ status, possibly from last recorded status in workspace.
virtual void Stop()=0
Interface to interact with DPM workspace.
Definition: workspace.hpp:98
virtual auto QueryStorageStatus() const -> std::filesystem::space_info=0
Queries available storage for workspace.
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
std::string Str() const
Convenience function for constructing a std::string from the exception.
Definition: report.cpp:97
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Definition: ioExecutor.hpp:41
Contains support functions for daqif.
daq::dpm::DpmService implements MAL services daqif::DpmControl and daqif::DpmDaqControl.
DPM server config.
const std::string LOGGER_NAME
Definition: config.hpp:22