ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
ocmDaqService.hpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Declaration of OcmDaqService
7  */
8 #ifndef DAQ_OCM_OCM_DAQ_SERVICE_HPP_
9 #define DAQ_OCM_OCM_DAQ_SERVICE_HPP_
10 #include <daq/config.hpp>
11 #include <rad/ioExecutor.hpp>
12 
13 #include <Daqif.hpp>
14 
15 #include <boost/asio/io_context.hpp>
16 #include <daq/daqContext.hpp>
17 #include <daq/eventLog.hpp>
18 #include <daq/eventLogObserver.hpp>
19 #include <daq/manager.hpp>
20 #include <log4cplus/logger.h>
21 #include <mal/Mal.hpp>
22 
23 namespace mal = ::elt::mal;
24 
25 struct ParsedSource {
26  ParsedSource(std::string name, std::string rr_uri);
27  ParsedSource() = default;
28  ParsedSource(ParsedSource const&) = default;
29  ParsedSource(ParsedSource&&) = default;
30  ParsedSource& operator=(ParsedSource const& rhs) = default;
31  ParsedSource& operator=(ParsedSource&& rhs) = default;
32 
33  bool operator==(ParsedSource const& rhs) const;
34  std::string name;
35  std::string rr_uri;
36 };
37 
38 std::ostream& operator<<(std::ostream& os, ParsedSource const& s);
39 
40 /**
41  * Parse user provided string in the format
42  * "<name>@<rr-uri>"
43  *
44  * @throw std::invalid_argument on errors.
45  */
46 ParsedSource ParseSourceUri(std::string_view s);
47 
48 /**
49  * Parse user provided string in the format
50  * "<name>@<rr-uri>[ <name>@...]"
51  *
52  * @throw std::invalid_argument on errors.
53  */
54 std::vector<ParsedSource> ParseSourceUris(std::string_view s);
55 
56 /**
57  * Parse the JSON properties user provides with StartDaq
58  *
59  * {"keywords": KEYWORDS,
60  * "awaitInterval": DURATION}
61  *
62  * @throws nlohmann::json::exception on parsing errors.
63  * @throws std::invalid_argument on argument errors.
64  */
65 daq::DaqContext ParseStartDaqContext(std::string const& properties);
66 
67 /**
68  * Parse JSON specification and returns corresponding DaqContext.
69  *
70  * @throws daq::json::StartDaqV2SpecError On schema error
71  * @throws daq::json::SchemaError On schema error
72  * @throws nlohmann::json::exception on parsing error
73  * @throws std::invalid_argument on argument errors.
74  * @throws std::exception-derived exception on other errors.
75  */
76 daq::DaqContext ParseStartDaqV2(std::string const& specification);
77 
78 /**
79  * Implements the MAL interface daqif::OcmDaq (async version).
80  *
81  * The server is only safe to use as a shared pointer, as weak pointers to this object is
82  * stored in asynchronous continuations, apart from the initiating operation.
83  * Provided boost::asio::io_context must outlive object and any pending operations.
84  *
85  * This object can be deleted while there are operations that have not complete yet.
86  * In that case, when the operation finally completes, an exception is sent as reply, even if
87  * the requested operation would have succeeded otherwise. The io_context must stay alive however,
88  * as it will be used as an executor when asynchonous operation is continued.
89  *
90  * A note on thread safety:
91  *
92  * The public (asynchronous) interface will immediately defer the execution to the provided
93  * executor, so depending on the executor this can provide thread safety.
94  *
95  *
96  * @ingroup daq_ocm_server
97  */
98 class OcmDaqService : public daqif::AsyncOcmDaqControl,
99  public std::enable_shared_from_this<OcmDaqService> {
100 public:
101  static constexpr char const* LOGGER_NAME = "ocm.service.daq";
102 
103  /**
104  *
105  * @param io_ctx ASIO io context to use as an executor. This object must outlive OcmDaqService.
106  * The io context will be progated to any created `daq::DaqController` instances as well.
107  * @param mal The mal instance to use to create instances. This object must outlive
108  * OcmDaqService.
109  * @param mgr The data acquisition manager to use. This object must outlive OcmDaqService.
110  */
111  OcmDaqService(boost::asio::io_context& io_ctx,
112  mal::Mal& mal,
113  daq::Manager& mgr,
114  std::string proc_name,
115  std::string output_path,
116  std::shared_ptr<daq::ObservableEventLog> event_log);
117  ~OcmDaqService();
118 
119  boost::future<std::shared_ptr<::daqif::DaqReply>>
120  StartDaq(const std::string& id,
121  const std::string& file_prefix,
122  const std::string& primary_sources,
123  const std::string& metadata_sources,
124  const std::string& properties) override;
125  boost::future<std::shared_ptr<::daqif::DaqReply>>
126  StartDaqV2(const std::string& specification) override;
127 
128  boost::future<std::shared_ptr<::daqif::DaqReply>> StopDaq(const std::string& id) override;
129 
130  boost::future<std::shared_ptr<::daqif::DaqReply>> ForceStopDaq(const std::string& id) override;
131 
132  boost::future<std::shared_ptr<::daqif::DaqReply>> AbortDaq(const std::string& id) override;
133 
134  boost::future<std::shared_ptr<::daqif::DaqReply>> ForceAbortDaq(const std::string& id) override;
135 
136  boost::future<std::shared_ptr<::daqif::DaqReply>>
137  UpdateKeywords(const std::string& id, const std::string& keywords) override;
138 
139  boost::future<std::shared_ptr<::daqif::AwaitDaqReply>>
140  AwaitDaqState(const std::string& id,
141  daqif::DaqState state,
142  daqif::DaqSubState substate,
143  double timeout) override;
144 
145  boost::future<std::shared_ptr<::daqif::DaqStatus>> GetStatus(const std::string& id) override;
146 
147  boost::future<std::vector<std::shared_ptr<::daqif::DaqStatus>>> GetActiveList() override;
148 
149 private:
150  std::string MakeExceptionMessageWithStatus(std::string const& id,
151  std::exception_ptr const& exception) const;
152 
153  /**
154  * Common Start function for StartDaq and StartDaqV2.
155  *
156  * @param context DaqContext describing DAQ to start.
157  * @param function Pointer to static string.
158  */
159  boost::future<std::shared_ptr<::daqif::DaqReply>>
160  StartDaq(daq::DaqContext const& context, char const* function);
161  boost::future<std::shared_ptr<::daqif::DaqReply>> StopDaq(const std::string& id, bool forced);
162  boost::future<std::shared_ptr<::daqif::DaqReply>> AbortDaq(const std::string& id, bool forced);
163 
164  /**
165  * @throws std::invalid_argument on validation errors.
166  */
167  void UpdateFrom(daq::DaqContext& context, daq::json::StartDaqV2Spec const& spec);
168 
169  boost::asio::io_context& m_io_ctx;
170  rad::IoExecutor m_executor;
171  mal::Mal& m_mal;
172  daq::Manager& m_mgr;
173  /**
174  * Process name
175  */
176  std::string m_proc_name;
177 
178  /**
179  * Directory in which FITS files are written
180  */
181  std::string m_output_path;
182 
183  std::shared_ptr<daq::ObservableEventLog> m_event_log;
184  boost::signals2::connection m_log_observer_connection;
185  /**
186  * Observer for m_event_log.
187  */
188  daq::EventLogObserverLogger m_log_observer;
189 
190  log4cplus::Logger m_logger;
191 };
192 
193 #endif // #ifndef DAQ_OCM_OCM_DAQ_SERVICE_HPP_
A simple daq::ObservableEventLog observer that logs observed events to provided logger.
Manager owns DaqController and FitsController (active data acquisitions) instances and multiplexes re...
Definition: manager.hpp:124
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
Contains declaration of daq::Context.
Contains declaration for EventLogObserverLogger.
Contains declaration for EventLog, ObservableEventLog and related events.
boost::future< std::vector< std::shared_ptr<::daqif::DaqStatus > > > GetActiveList() override
OcmDaqService(boost::asio::io_context &io_ctx, mal::Mal &mal, daq::Manager &mgr, std::string proc_name, std::string output_path, std::shared_ptr< daq::ObservableEventLog > event_log)
boost::future< std::shared_ptr<::daqif::DaqReply > > StopDaq(const std::string &id) override
boost::future< std::shared_ptr<::daqif::AwaitDaqReply > > AwaitDaqState(const std::string &id, daqif::DaqState state, daqif::DaqSubState substate, double timeout) override
boost::future< std::shared_ptr<::daqif::DaqReply > > AbortDaq(const std::string &id) override
boost::future< std::shared_ptr<::daqif::DaqReply > > StartDaqV2(const std::string &specification) override
boost::future< std::shared_ptr<::daqif::DaqReply > > UpdateKeywords(const std::string &id, const std::string &keywords) override
boost::future< std::shared_ptr<::daqif::DaqReply > > StartDaq(const std::string &id, const std::string &file_prefix, const std::string &primary_sources, const std::string &metadata_sources, const std::string &properties) override
boost::future< std::shared_ptr<::daqif::DaqReply > > ForceStopDaq(const std::string &id) override
boost::future< std::shared_ptr<::daqif::DaqStatus > > GetStatus(const std::string &id) override
static constexpr char const * LOGGER_NAME
boost::future< std::shared_ptr<::daqif::DaqReply > > ForceAbortDaq(const std::string &id) override
Implements the MAL interface daqif::OcmDaq (async version).
Declaration of daq::Manager
Structure with a close mapping from JSON representation in the StartDaqV2 MAL request.
Definition: startDaqV2.hpp:33
daq::DaqContext ParseStartDaqContext(std::string const &properties)
Parse the JSON properties user provides with StartDaq.
ParsedSource ParseSourceUri(std::string_view s)
Parse user provided string in the format "<name>@<rr-uri>".
daq::DaqContext ParseStartDaqV2(std::string const &specification)
Parse JSON specification and returns corresponding DaqContext.
std::vector< ParsedSource > ParseSourceUris(std::string_view s)
Parse user provided string in the format "<name>@<rr-uri>[ <name>@...]".
std::ostream & operator<<(std::ostream &os, ParsedSource const &s)
ParsedSource & operator=(ParsedSource const &rhs)=default
bool operator==(ParsedSource const &rhs) const
std::string name
ParsedSource()=default
ParsedSource(ParsedSource &&)=default
ParsedSource(ParsedSource const &)=default
std::string rr_uri
ParsedSource & operator=(ParsedSource &&rhs)=default
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44