14 #include <boost/asio/io_context.hpp>
15 #include <boost/throw_exception.hpp>
17 #include <fmt/format.h>
18 #include <log4cplus/loggingmacros.h>
30 : m_executor(executor)
32 , m_workspace(workspace)
33 , m_scheduler(scheduler)
34 , m_work_guard(m_executor.get_io_context().get_executor())
35 , m_sigset(executor.get_io_context(), SIGINT, SIGTERM)
36 , m_logger(log4cplus::Logger::getInstance(
LOGGER_NAME)) {
37 InitiateSignalHandler();
45 std::shared_ptr<daqif::StorageStatus> rep = m_mal.createDataEntity<daqif::StorageStatus>();
46 rep->setCapacity(status.capacity);
47 rep->setFree(status.free);
48 rep->setAvailable(status.available);
50 return boost::make_ready_future<std::shared_ptr<daqif::StorageStatus>>(rep);
53 return boost::make_exceptional_future<std::shared_ptr<daqif::StorageStatus>>(
54 daqif::Exception(r.
Str()));
63 LOG4CPLUS_INFO(m_logger,
"Application termination requested with Exit()");
64 return boost::async(m_executor, [
this]()
mutable -> std::string {
71 return boost::make_ready_future(std::string(VERSION));
74 boost::future<std::shared_ptr<::daqif::DaqReply>>
76 LOG4CPLUS_DEBUG(m_logger, fmt::format(
"QueueDaq() Queuing new DAQ"));
77 using V = ::daqif::DaqReply;
78 using R = std::shared_ptr<V>;
79 return boost::async(m_executor, [specification,
this]()
mutable -> R {
81 auto id = m_scheduler.
QueueDaq(specification);
83 auto rep = m_mal.createDataEntity<V>();
89 BOOST_THROW_EXCEPTION(daqif::DaqException(
id, r.
Str()));
91 }
catch (daqif::DaqException
const& e) {
93 LOG4CPLUS_ERROR(m_logger,
"QueueDaq() failed with ICD error:\n" << r.
Str());
97 LOG4CPLUS_ERROR(m_logger,
"QueueDaq() failed with following error(s):\n" << r);
98 BOOST_THROW_EXCEPTION(daqif::DaqException(
"", r.
Str()));
104 LOG4CPLUS_INFO(m_logger, fmt::format(
"AbortDaq({}): Handling request",
id));
105 using V = ::daqif::DaqReply;
106 using R = std::shared_ptr<V>;
107 return boost::async(m_executor, [
id,
this]()
mutable -> R {
111 auto rep = m_mal.createDataEntity<V>();
113 rep->setError(
false);
117 throw daqif::DaqException(
id, r.
Str());
119 }
catch (daqif::DaqException
const& e) {
121 LOG4CPLUS_ERROR(m_logger,
"AbortDaq() failed with following error(s):\n" << r);
125 LOG4CPLUS_ERROR(m_logger,
"AbortDaq() failed with following error(s):\n" << r);
126 BOOST_THROW_EXCEPTION(daqif::DaqException(
"", r.
Str()));
132 LOG4CPLUS_INFO(m_logger, fmt::format(
"GetDaqStatus({}): Handling request",
id));
133 using V = ::daqif::DaqStatus;
134 using R = std::shared_ptr<V>;
135 return boost::async(m_executor, [
id,
this]()
mutable -> R {
137 return GetDaqStatusSync(
id);
138 }
catch (daqif::DaqException
const& e) {
140 LOG4CPLUS_ERROR(m_logger,
"GetDaqStatus() failed with following error(s):\n" << r);
144 LOG4CPLUS_ERROR(m_logger,
"GetDaqStatus() failed with following error(s):\n" << r);
145 BOOST_THROW_EXCEPTION(daqif::DaqException(
id, r.
Str()));
151 LOG4CPLUS_INFO(m_logger,
"GetActiveDaqs(): Handling request");
152 using V = std::shared_ptr<::daqif::DaqStatus>;
153 using R = std::vector<V>;
154 return boost::async(m_executor, [
this]()
mutable -> R {
158 for (
auto const&
id : ids) {
159 statuses.emplace_back(GetDaqStatusSync(
id));
162 }
catch (daqif::DaqException
const& e) {
164 LOG4CPLUS_ERROR(m_logger,
"GetActiveDaqs() failed with following error(s):\n" << r);
168 LOG4CPLUS_ERROR(m_logger,
"GetActiveDaqs() failed with following error(s):\n" << r);
169 BOOST_THROW_EXCEPTION(daqif::DaqException(
"", r.
Str()));
174 void DpmService::InitiateSignalHandler() {
175 m_sigset.async_wait([&](boost::system::error_code
const& ec,
int sig_num) {
179 LOG4CPLUS_INFO(m_logger,
180 fmt::format(
"Application termination requested with signal {}", sig_num));
185 void DpmService::HandleExit() {
186 LOG4CPLUS_TRACE(m_logger,
"HandleExit(): Application termination has been requested");
188 if (m_work_guard.owns_work()) {
190 m_work_guard.reset();
195 LOG4CPLUS_INFO(m_logger,
196 "Application termination already requested before. Stopping immediately!");
201 std::shared_ptr<daqif::DaqStatus> DpmService::GetDaqStatusSync(
const std::string&
id) {
202 LOG4CPLUS_INFO(m_logger, fmt::format(
"GetDaqStatusSync({}): Handling request",
id));
203 using V = ::daqif::DaqStatus;
206 auto rep = m_mal.createDataEntity<V>();
210 error::NestedExceptionReporter r(std::current_exception());
211 BOOST_THROW_EXCEPTION(daqif::DaqException(
id, r.Str()));
boost::future< std::shared_ptr<::daqif::DaqReply > > QueueDaq(const std::string &specification) override
DpmService(rad::IoExecutor &executor, elt::mal::Mal &mal, Workspace &workspace, Scheduler &scheduler)
Construct replier.
boost::future< std::string > GetVersion() override
Cancels any started merges and terminates application.
boost::future< std::shared_ptr<::daqif::StorageStatus > > QueryStorageStatus() override
Cancels any started merges and terminates application.
boost::future< std::string > Exit() override
Cancels any started merges and terminates application.
boost::future< std::vector< std::shared_ptr<::daqif::DaqStatus > > > GetActiveDaqs() override
boost::future< std::shared_ptr<::daqif::DaqReply > > AbortDaq(const std::string &id) override
boost::future< std::shared_ptr<::daqif::DaqStatus > > GetDaqStatus(const std::string &id) override
Schedules asynchronous activities that results in merged Data Product and delivery.
virtual std::vector< std::string > GetQueue() const noexcept=0
Queries current DAQ queue.
virtual std::string QueueDaq(std::string const &dp_spec)=0
Queues DAQ for processing.
virtual void AbortDaq(std::string const &id)=0
Abort merging DAQ identified by id.
virtual Status GetDaqStatus(std::string const &id) const =0
Queries current DAQ status, possibly from last recorded status in workspace.
Interface to interact with DPM workspace.
virtual auto QueryStorageStatus() const -> std::filesystem::space_info=0
Queries available storage for workspace.
Adapter object intended to be used in contexts without direct access to the output-stream object.
std::string Str() const
Convenience function for constructing a std::string from the exception.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
boost::asio::io_context & get_io_context() noexcept
Not part of the boost::thread::executor concept.
Contains support functions for daqif.
daq::dpm::DpmService implements MAL services daqif::DpmControl and daqif::DpmDaqControl.
const std::string LOGGER_NAME