ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
ocmDaqService.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Declaration of OcmDaqService
7  */
8 #ifndef DAQ_OCM_OCM_DAQ_SERVICE_HPP_
9 #define DAQ_OCM_OCM_DAQ_SERVICE_HPP_
10 #include <daq/config.hpp>
11 #include <rad/ioExecutor.hpp>
12 
13 #include <Daqif.hpp>
14 
15 #include <boost/asio/io_context.hpp>
16 #include <daq/eventLog.hpp>
17 #include <daq/eventLogObserver.hpp>
18 #include <daq/manager.hpp>
19 #include <daq/daqContext.hpp>
20 #include <log4cplus/logger.h>
21 #include <mal/Mal.hpp>
22 
23 namespace mal = ::elt::mal;
24 
25 struct ParsedSource {
26  ParsedSource(std::string name, std::string rr_uri);
27  ParsedSource() = default;
28  ParsedSource(ParsedSource const&) = default;
29  ParsedSource(ParsedSource&&) = default;
30  ParsedSource& operator=(ParsedSource const& rhs) = default;
31  ParsedSource& operator=(ParsedSource&& rhs) = default;
32 
33  bool operator==(ParsedSource const& rhs) const;
34  std::string name;
35  std::string rr_uri;
36 };
37 
38 std::ostream& operator<<(std::ostream& os, ParsedSource const& s);
39 
40 /**
41  * Parse user provided string in the format
42  * "<name>@<rr-uri>"
43  *
44  * @throw std::invalid_argument on errors.
45  */
46 ParsedSource ParseSourceUri(std::string_view s);
47 
48 /**
49  * Parse user provided string in the format
50  * "<name>@<rr-uri>[ <name>@...]"
51  *
52  * @throw std::invalid_argument on errors.
53  */
54 std::vector<ParsedSource> ParseSourceUris(std::string_view s);
55 
56 
57 /**
58  * Parse the JSON properties user provides with StartDaq
59  *
60  * {"keywords": KEYWORDS,
61  * "awaitInterval": DURATION}
62  *
63  * @throws nlohmann::json::exception on parsing errors.
64  */
65 daq::DaqContext ParseStartDaqContext(std::string const& properties);
66 
67 /**
68  * Implements the MAL interface daqif::OcmDaq (async version).
69  *
70  * The server is only safe to use as a shared pointer, as weak pointers to this object is
71  * stored in asynchronous continuations, apart from the initiating operation.
72  * Provided boost::asio::io_context must outlive object and any pending operations.
73  *
74  * This object can be deleted while there are operations that have not complete yet.
75  * In that case, when the operation finally completes, an exception is sent as reply, even if
76  * the requested operation would have succeeded otherwise. The io_context must stay alive however,
77  * as it will be used as an executor when asynchonous operation is continued.
78  *
79  * A note on thread safety:
80  *
81  * The public (asynchronous) interface will immediately defer the execution to the provided
82  * executor, so depending on the executor this can provide thread safety.
83  *
84  *
85  * @ingroup daq_ocm_server
86  */
87 class OcmDaqService : public daqif::AsyncOcmDaqControl,
88  public std::enable_shared_from_this<OcmDaqService> {
89 public:
90  static constexpr char const* LOGGER_NAME = "ocm.service.daq";
91 
92  /**
93  *
94  * @param io_ctx ASIO io context to use as an executor. This object must outlive OcmDaqService.
95  * The io context will be progated to any created `daq::DaqController` instances as well.
96  * @param mal The mal instance to use to create instances. This object must outlive
97  * OcmDaqService.
98  * @param mgr The data acquisition manager to use. This object must outlive OcmDaqService.
99  */
100  OcmDaqService(boost::asio::io_context& io_ctx,
101  mal::Mal& mal,
102  daq::Manager& mgr,
103  std::string proc_name,
104  std::string output_path,
105  std::shared_ptr<daq::ObservableEventLog> event_log);
106  ~OcmDaqService();
107 
108  boost::future<std::shared_ptr<::daqif::DaqReply>>
109  StartDaq(const std::string& id,
110  const std::string& file_prefix,
111  const std::string& primary_sources,
112  const std::string& metadata_sources,
113  const std::string& properties) override;
114 
115  boost::future<std::shared_ptr<::daqif::DaqReply>> StopDaq(const std::string& id) override;
116 
117  boost::future<std::shared_ptr<::daqif::DaqReply>>
118  ForceStopDaq(const std::string& id) override;
119 
120  boost::future<std::shared_ptr<::daqif::DaqReply>>
121  AbortDaq(const std::string& id) override;
122 
123  boost::future<std::shared_ptr<::daqif::DaqReply>>
124  ForceAbortDaq(const std::string& id) override;
125 
126  boost::future<std::shared_ptr<::daqif::DaqReply>>
127  UpdateKeywords(const std::string& id, const std::string& keywords) override;
128 
129  boost::future<std::shared_ptr<::daqif::AwaitDaqReply>>
130  AwaitDaqState(const std::string& id,
131  daqif::DaqState state,
132  daqif::DaqSubState substate,
133  double timeout) override;
134 
135  boost::future<std::shared_ptr<::daqif::DaqStatus>>
136  GetStatus(const std::string& id) override;
137 
138  boost::future<std::vector<std::shared_ptr<::daqif::DaqStatus>>>
139  GetActiveList() override;
140 
141 private:
142  boost::future<std::shared_ptr<::daqif::DaqReply>>
143  StopDaq(const std::string& id, bool forced);
144  boost::future<std::shared_ptr<::daqif::DaqReply>>
145  AbortDaq(const std::string& id, bool forced);
146 
147  boost::asio::io_context& m_io_ctx;
148  rad::IoExecutor m_executor;
149  mal::Mal& m_mal;
150  daq::Manager& m_mgr;
151  /**
152  * Process name
153  */
154  std::string m_proc_name;
155 
156  /**
157  * Directory in which FITS files are written
158  */
159  std::string m_output_path;
160 
161  std::shared_ptr<daq::ObservableEventLog> m_event_log;
162  boost::signals2::connection m_log_observer_connection;
163  /**
164  * Observer for m_event_log.
165  */
166  daq::EventLogObserverLogger m_log_observer;
167 
168  log4cplus::Logger m_logger;
169 };
170 
171 #endif // #ifndef DAQ_OCM_OCM_DAQ_SERVICE_HPP_
ParsedSource::operator==
bool operator==(ParsedSource const &rhs) const
Definition: ocmDaqService.cpp:81
OcmDaqService::ForceStopDaq
boost::future< std::shared_ptr<::daqif::DaqReply > > ForceStopDaq(const std::string &id) override
Definition: ocmDaqService.cpp:331
OcmDaqService::GetStatus
boost::future< std::shared_ptr<::daqif::DaqStatus > > GetStatus(const std::string &id) override
Definition: ocmDaqService.cpp:547
ParsedSource::operator=
ParsedSource & operator=(ParsedSource &&rhs)=default
ParseSourceUri
ParsedSource ParseSourceUri(std::string_view s)
Parse user provided string in the format "<name>@<rr-uri>".
Definition: ocmDaqService.cpp:93
ioExecutor.hpp
ParseStartDaqContext
daq::DaqContext ParseStartDaqContext(std::string const &properties)
Parse the JSON properties user provides with StartDaq.
Definition: ocmDaqService.cpp:46
OcmDaqService::GetActiveList
boost::future< std::vector< std::shared_ptr<::daqif::DaqStatus > > > GetActiveList() override
Definition: ocmDaqService.cpp:579
OcmDaqService::UpdateKeywords
boost::future< std::shared_ptr<::daqif::DaqReply > > UpdateKeywords(const std::string &id, const std::string &keywords) override
Definition: ocmDaqService.cpp:483
ParsedSource::operator=
ParsedSource & operator=(ParsedSource const &rhs)=default
OcmDaqService::LOGGER_NAME
static constexpr char const * LOGGER_NAME
Definition: ocmDaqService.hpp:90
rad::IoExecutor
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
manager.hpp
Declaration of daq::Manager
operator<<
std::ostream & operator<<(std::ostream &os, ParsedSource const &s)
Definition: ocmDaqService.cpp:88
eventLogObserver.hpp
Contains declaration for EventLogObserverLogger.
config.hpp
OcmDaqService::StartDaq
boost::future< std::shared_ptr<::daqif::DaqReply > > StartDaq(const std::string &id, const std::string &file_prefix, const std::string &primary_sources, const std::string &metadata_sources, const std::string &properties) override
Definition: ocmDaqService.cpp:180
eventLog.hpp
Contains declaration for EventLog, ObservableEventLog and related events.
OcmDaqService::ForceAbortDaq
boost::future< std::shared_ptr<::daqif::DaqReply > > ForceAbortDaq(const std::string &id) override
Definition: ocmDaqService.cpp:406
daq::DaqContext
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:48
OcmDaqService
Implements the MAL interface daqif::OcmDaq (async version).
Definition: ocmDaqService.hpp:88
ParsedSource::name
std::string name
Definition: ocmDaqService.hpp:34
OcmDaqService::AbortDaq
boost::future< std::shared_ptr<::daqif::DaqReply > > AbortDaq(const std::string &id) override
Definition: ocmDaqService.cpp:391
daqContext.hpp
Contains declaration of daq::Context.
ParseSourceUris
std::vector< ParsedSource > ParseSourceUris(std::string_view s)
Parse user provided string in the format "<name>@<rr-uri>[ <name>@...]".
Definition: ocmDaqService.cpp:130
ParsedSource::ParsedSource
ParsedSource(ParsedSource &&)=default
ParsedSource::ParsedSource
ParsedSource()=default
OcmDaqService::OcmDaqService
OcmDaqService(boost::asio::io_context &io_ctx, mal::Mal &mal, daq::Manager &mgr, std::string proc_name, std::string output_path, std::shared_ptr< daq::ObservableEventLog > event_log)
Definition: ocmDaqService.cpp:151
OcmDaqService::AwaitDaqState
boost::future< std::shared_ptr<::daqif::AwaitDaqReply > > AwaitDaqState(const std::string &id, daqif::DaqState state, daqif::DaqSubState substate, double timeout) override
Definition: ocmDaqService.cpp:605
OcmDaqService::~OcmDaqService
~OcmDaqService()
Definition: ocmDaqService.cpp:175
OcmDaqService::StopDaq
boost::future< std::shared_ptr<::daqif::DaqReply > > StopDaq(const std::string &id) override
Definition: ocmDaqService.cpp:316
ParsedSource::ParsedSource
ParsedSource(ParsedSource const &)=default
elt::mal
Definition: dpmClient.hpp:25
daq::EventLogObserverLogger
A simple daq::ObservableEventLog observer that logs observed events to provided logger.
Definition: eventLogObserver.hpp:22
ParsedSource::rr_uri
std::string rr_uri
Definition: ocmDaqService.hpp:35
ParsedSource
Definition: ocmDaqService.hpp:25
daq::Manager
Manager owns DaqController and FitsController (active data acquisitions) instances and multiplexes re...
Definition: manager.hpp:124