ifw-daq  2.1.0-pre1
IFW Data Acquisition modules
ocmDaqService.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_server
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Declaration of OcmDaqService
7  */
8 #include "ocmDaqService.hpp"
9 
10 #include <algorithm>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <log4cplus/loggingmacros.h>
14 #include <mal/rr/qos/ConnectionTime.hpp>
15 #include <nlohmann/json.hpp>
16 
17 #include <daq/conversion.hpp>
18 #include <daq/daqController.hpp>
19 #include <daq/error/report.hpp>
20 #include <daq/fits/json.hpp>
21 #include <daqif/state.hpp>
22 
23 using boost::enable_current_exception;
24 using boost::make_exceptional_future;
26 using fmt::format;
27 using std::chrono::duration_cast;
28 using std::chrono::milliseconds;
29 
30 namespace {
31 
32 std::vector<daq::DaqContext::Source> ParseSource(std::string const& str) {
33  std::vector<daq::DaqContext::Source> sources;
34 
35  std::vector<ParsedSource> raw_sources = ParseSourceUris(str);
36  sources.reserve(raw_sources.size());
37  for (auto const& raw : raw_sources) {
38  sources.push_back({raw.name, raw.rr_uri});
39  }
40 
41  return sources;
42 }
43 
44 } // namespace
45 
46 daq::DaqContext ParseStartDaqContext(std::string const& json_properties) {
47  using std::chrono::duration_cast;
48  using std::chrono::milliseconds;
49  using Seconds = std::chrono::duration<double>;
50  daq::DaqContext properties;
51 
52  if (json_properties.empty()) {
53  // No arguments
54  return properties;
55  }
56 
57  auto json = nlohmann::json::parse(json_properties);
58  if (!json.is_object()) {
59  throw boost::enable_current_exception(std::invalid_argument(
60  fmt::format("expected type object but got type {}", json.type_name())));
61  }
62  if (json.contains("keywords")) {
63  properties.keywords = daq::fits::ParseJsonKeywords(json["keywords"]);
64  }
65  if (json.contains("awaitInterval")) {
66  auto& value = json["awaitInterval"];
67  if (!value.is_number()) {
68  throw boost::enable_current_exception(std::invalid_argument(
69  fmt::format("'awaitInterval': unsupported type: {}", value.type_name())));
70  }
71  auto await_interval = value.get<double>();
72  if (await_interval < 0.0) {
73  throw boost::enable_current_exception(std::invalid_argument(
74  fmt::format("'awaitInterval' must be positive number, got {}", await_interval)));
75  }
76  properties.await_interval = duration_cast<milliseconds>(Seconds(value.get<double>()));
77  }
78  return properties;
79 }
80 
81 bool ParsedSource::operator==(ParsedSource const& rhs) const {
82  return name == rhs.name && rr_uri == rhs.rr_uri;
83 }
84 
85 ParsedSource::ParsedSource(std::string name, std::string rr_uri)
86  : name(std::move(name)), rr_uri(std::move(rr_uri)) {
87 }
88 std::ostream& operator<<(std::ostream& os, ParsedSource const& s) {
89  os << "name: '" << s.name << "', rr_uri='" << s.rr_uri << "'";
90  return os;
91 }
92 
93 ParsedSource ParseSourceUri(std::string_view s) {
94  auto start = s.find_first_not_of(' ');
95  auto name_end_pos = s.find_first_of('@');
96 
97  if (name_end_pos == std::string_view::npos) {
98  throw boost::enable_current_exception(
99  std::invalid_argument("separator '@' not found in expected format 'name@rr-uri'"));
100  }
101  auto name = s.substr(start, name_end_pos - start);
102  if (name.empty()) {
103  throw boost::enable_current_exception(
104  std::invalid_argument("name part in 'name@rr-uri' is empty"));
105  }
106 
107  start = name_end_pos + 1;
108  if (start >= s.size()) {
109  throw boost::enable_current_exception(
110  std::invalid_argument("invalid format string, expected format 'name@rr-uri'"));
111  }
112  auto rr_uri_end_pos = s.find_first_of(" ,", start);
113  if (name_end_pos == std::string_view::npos) {
114  throw boost::enable_current_exception(std::invalid_argument("separator ',' not found"));
115  }
116 
117  auto rr_uri = s.substr(start, rr_uri_end_pos - start);
118  if (rr_uri.empty()) {
119  throw boost::enable_current_exception(
120  std::invalid_argument("rr_uri part in 'name@rr-uri' is empty"));
121  }
122  return ParsedSource(std::string(name), std::string(rr_uri));
123 }
124 /**
125  * Parse user provided string in the format
126  * "<name>@<rr-uri>[ <name>@...]"
127  *
128  * @throw std::invalid_argument on errors.
129  */
130 std::vector<ParsedSource> ParseSourceUris(std::string_view s) {
131  std::vector<ParsedSource> result;
132  size_t begin = 0;
133 
134  while (begin < s.size()) {
135  const auto end = s.find_first_of(' ', begin);
136 
137  if (begin != end) {
138  result.emplace_back(ParseSourceUri(s.substr(begin, end - begin)));
139  }
140 
141  if (end == std::string_view::npos) {
142  break;
143  }
144 
145  begin = end + 1;
146  }
147 
148  return result;
149 }
150 
151 OcmDaqService::OcmDaqService(boost::asio::io_context& io_ctx,
152  mal::Mal& mal,
153  daq::Manager& mgr,
154  std::string proc_name,
155  std::string output_path,
156  std::shared_ptr<daq::ObservableEventLog> event_log)
157  : m_io_ctx(io_ctx)
158  , m_executor(m_io_ctx)
159  , m_mal(mal)
160  , m_mgr(mgr)
161  , m_proc_name(std::move(proc_name))
162  , m_output_path(std::move(output_path))
163  , m_event_log(std::move(event_log))
164  , m_log_observer_connection()
165  , m_log_observer(log4cplus::Logger::getInstance("daq.eventlog"))
166  , m_logger(log4cplus::Logger::getInstance(LOGGER_NAME)) {
167  m_log_observer_connection =
168  m_event_log->ConnectObserver(std::reference_wrapper(m_log_observer));
169  if (m_proc_name.empty()) {
170  throw boost::enable_current_exception(
171  std::invalid_argument("OcmDaqService: Process name cannot be empty"));
172  }
173 }
174 
176  m_log_observer_connection.disconnect();
177 }
178 
179 boost::future<std::shared_ptr<::daqif::DaqReply>>
180 OcmDaqService::StartDaq(const std::string& id,
181  const std::string& file_prefix,
182  const std::string& primary_sources,
183  const std::string& metadata_sources,
184  const std::string& json_properties) {
185  return boost::async(
186  m_executor,
187  [=, self = shared_from_this()]() mutable {
188  self->m_event_log->AddEvent(daq::UserActionEvent(
189  id,
190  fmt::format("Request received: "
191  "StartDaq(id='{0}', file_prefix='{1}', "
192  "primary_sources='{2}', metadata_sources='{3}', "
193  "json_properties='{4}'",
194  id,
195  file_prefix,
196  primary_sources,
197  metadata_sources,
198  json_properties),
199  std::nullopt));
200  std::filesystem::path prefix(file_prefix);
201  if (prefix.has_parent_path()) {
202  return boost::make_exceptional_future<std::shared_ptr<::daqif::DaqReply>>(
203  daqif::DaqException(
204  id,
205  fmt::format("file_prefix \"{}\" may not contain parent paths",
206  file_prefix)));
207  }
208  // Parse provided JSON
209  // Validation that require state is performed in m_executor for thread safety.
210  daq::DaqContext context;
211  try {
212  context = ParseStartDaqContext(json_properties);
213  } catch (std::exception const& e) {
214  auto msg =
215  fmt::format("Failed to parse StartDaq JSON properties: {}", e.what());
216  self->m_event_log->AddEvent(daq::ErrorEvent(id, msg, std::nullopt, "user"));
217  return boost::make_exceptional_future<std::shared_ptr<::daqif::DaqReply>>(
218  daqif::DaqException(id, msg));
219  }
220 
221  context.file_id = self->m_mgr.MakeDaqId(&context.creation_time);
222  auto validated_id = id;
223  if (validated_id.empty()) {
224  // User did not provide an ID -> use file_id as DAQ id
225  validated_id = context.file_id;
226  LOG4CPLUS_INFO(
227  self->m_logger,
228  fmt::format("StartDaq(id='{0}'): Created and assigned DAQ id='{0}'",
229  context.file_id));
230  } else {
231  // Check that an instance does not already exist with id
232  if (self->m_mgr.HaveDaq(validated_id)) {
233  LOG4CPLUS_INFO(
234  self->m_logger,
235  fmt::format("StartDaq(id='{0}'): DAQ with id='{0}' already exist",
236  validated_id));
237  return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
238  daqif::DaqException(id,
239  "Data acquisition with same id already exist"));
240  }
241  }
242 
243  try {
244  context.id = validated_id;
245  context.process_name = m_proc_name;
246  context.dp_name_prefix = file_prefix;
247  // Create primary sources
248  context.prim_sources = ParseSource(primary_sources);
249  // Create metadata sources
250  context.meta_sources = ParseSource(metadata_sources);
251 
252  // Start
253  return self->m_mgr.StartDaqAsync(context).then(
254  self->m_executor,
255  [weak_self = std::weak_ptr(self->shared_from_this()), id = validated_id](
256  boost::future<daq::State> f) -> std::shared_ptr<daqif::DaqReply> {
257  auto self = weak_self.lock();
258  if (!self) {
259  LOG4CPLUS_WARN(LOGGER_NAME,
260  fmt::format("StartDaq(id='{}'): StartDaqAsync is "
261  "complete but MAL service has "
262  "been abandoned. Throwing exception.",
263  id));
264  throw boost::enable_current_exception(
265  daqif::DaqException(id, "Service has been abandoned"));
266  }
267  try {
268  f.get();
269  auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
270  assert(rep);
271  rep->setId(id);
272  rep->setError(false);
273  return rep;
274  } catch (...) {
275  auto what =
276  NestedExceptionReporter(std::current_exception()).Str();
277  LOG4CPLUS_ERROR(self->m_logger,
278  fmt::format("StartDaq(id='{}'): StartDaqAsync "
279  "completed with failure: {}",
280  id,
281  what));
282 
283  throw boost::enable_current_exception(daqif::DaqException(
284  id, fmt::format("Start failed: {}", what)));
285  }
286  });
287  } catch (std::invalid_argument const& e) {
288  LOG4CPLUS_INFO(self->m_logger,
289  fmt::format("StartDaq(id='{}'): Invalid argument error while "
290  "processing request: {}",
291  validated_id,
292  e.what()));
293  return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
294  daqif::DaqException(validated_id, e.what()));
295  } catch (std::exception const& e) {
296  LOG4CPLUS_INFO(self->m_logger,
297  fmt::format("StartDaq(id='{}'): Error while"
298  "processing request: {}",
299  validated_id,
300  e.what()));
301  return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
302  daqif::DaqException(validated_id, e.what()));
303  } catch (...) {
304  LOG4CPLUS_INFO(
305  self->m_logger,
306  fmt::format("StartDaq(id='{}'): Unknown error while processing request",
307  validated_id));
308  return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
309  daqif::DaqException(validated_id, "Uknown error"));
310  }
311  })
312  // unwrap outer future from async() to get the future we want from m_mgr.StartDaqAsync()
313  .unwrap();
314 }
315 
316 boost::future<std::shared_ptr<::daqif::DaqReply>> OcmDaqService::StopDaq(const std::string& id) {
317  return boost::async(
318  m_executor,
319  [self = shared_from_this(), id]() {
320  self->m_event_log->AddEvent(daq::UserActionEvent(id,
321  fmt::format("Request received: "
322  "StopDaq(id='{0}')",
323  id),
324  std::nullopt));
325  return self->StopDaq(id, false);
326  })
327  .unwrap();
328 }
329 
330 boost::future<std::shared_ptr<::daqif::DaqReply>>
331 OcmDaqService::ForceStopDaq(const std::string& id) {
332  return boost::async(m_executor,
333  [self = shared_from_this(), id]() {
334  self->m_event_log->AddEvent(
336  fmt::format("Request received: "
337  "ForceStopDaq(id='{0}')",
338  id),
339  std::nullopt));
340  return self->StopDaq(id, true);
341  })
342  .unwrap();
343 }
344 
345 boost::future<std::shared_ptr<::daqif::DaqReply>>
346 OcmDaqService::StopDaq(const std::string& id, bool forced) {
347  return boost::async(
348  m_executor,
349  [self = shared_from_this(), id, forced]() {
350  return self->m_mgr
351  .StopDaqAsync(id,
352  forced ? daq::ErrorPolicy::Tolerant : daq::ErrorPolicy::Strict)
353  .then(
354  self->m_executor,
355  [weak_self = std::weak_ptr(self->shared_from_this()),
356  id](boost::future<daq::Status> f) -> std::shared_ptr<daqif::DaqReply> {
357  auto self = weak_self.lock();
358  if (!self) {
359  LOG4CPLUS_WARN(LOGGER_NAME,
360  fmt::format("StopDaq(id='{}'): StopDaqAsync is "
361  "complete but MAL service has "
362  "been abandoned. Throwing exception.",
363  id));
364  throw boost::enable_current_exception(
365  daqif::DaqException(id, "Service has been abandoned"));
366  }
367  try {
368  f.get();
369  auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
370  assert(rep);
371  rep->setId(id);
372  rep->setError(false);
373  return rep;
374  } catch (...) {
375  auto what =
376  NestedExceptionReporter(std::current_exception()).Str();
377  LOG4CPLUS_INFO(self->m_logger,
378  fmt::format("StopDaq(id='{}'): "
379  "completed with failure: {}",
380  id,
381  what));
382  throw boost::enable_current_exception(daqif::DaqException(
383  id, fmt::format("Stop failed: {}", what)));
384  }
385  });
386  })
387  // unwrap outer future from async() to get the future we want from m_mgr.StopDaqAsync()
388  .unwrap();
389 }
390 
391 boost::future<std::shared_ptr<::daqif::DaqReply>> OcmDaqService::AbortDaq(const std::string& id) {
392  return boost::async(m_executor,
393  [self = shared_from_this(), id]() {
394  self->m_event_log->AddEvent(
396  fmt::format("Request received: "
397  "AbortDaq(id='{0}')",
398  id),
399  std::nullopt));
400  return self->AbortDaq(id, false);
401  })
402  .unwrap();
403 }
404 
405 boost::future<std::shared_ptr<::daqif::DaqReply>>
406 OcmDaqService::ForceAbortDaq(const std::string& id) {
407  return boost::async(m_executor,
408  [self = shared_from_this(), id]() {
409  self->m_event_log->AddEvent(
411  fmt::format("Request received: "
412  "ForceAbortDaq(id='{0}')",
413  id),
414  std::nullopt));
415  return self->AbortDaq(id, true);
416  })
417  .unwrap();
418 }
419 
420 boost::future<std::shared_ptr<::daqif::DaqReply>>
421 OcmDaqService::AbortDaq(const std::string& id, bool forced) {
422  return boost::async(
423  m_executor,
424  [self = shared_from_this(), id, forced]() {
425  return self->m_mgr
426  .AbortDaqAsync(
427  id, forced ? daq::ErrorPolicy::Tolerant : daq::ErrorPolicy::Strict)
428  .then(
429  self->m_executor,
430  [weak_self = std::weak_ptr(self->shared_from_this()), id, forced](
431  boost::future<daq::Status> f) -> std::shared_ptr<daqif::DaqReply> {
432  auto self = weak_self.lock();
433  if (!self) {
434  LOG4CPLUS_WARN(
435  LOGGER_NAME,
436  fmt::format("AbortDaq(id='{}', forced={}): AbortDaqAsync is "
437  "complete but MAL service has "
438  "been abandoned. Throwing exception.",
439  id,
440  forced));
441  throw boost::enable_current_exception(
442  daqif::DaqException(id, "Service has been abandoned"));
443  }
444  try {
445  auto result = f.get();
446  LOG4CPLUS_INFO(
447  self->m_logger,
448  fmt::format("AbortDaq(id='{}', forced={}): "
449  "AbortDaqAsync Completed successfully",
450  id,
451  forced));
452  auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
453  assert(rep);
454  rep->setId(id);
455  rep->setError(result.error);
456  LOG4CPLUS_DEBUG(
457  self->m_logger,
458  fmt::format("AbortDaq(id='{}', forced={}): "
459  "AbortDaqAsync Completed, returning reply now.",
460  id,
461  forced));
462  return rep;
463  } catch (...) {
464  auto what =
465  NestedExceptionReporter(std::current_exception()).Str();
466  LOG4CPLUS_ERROR(self->m_logger,
467  fmt::format("AbortDaq(id='{}', forced={}): "
468  "AbortDaqAsync Completed "
469  "with fatal error:\n{}",
470  id,
471  forced,
472  what));
473  throw boost::enable_current_exception(
474  daqif::DaqException(id, what));
475  }
476  }); // m_msg.AbortDaqAsync cont
477  }) // boost::async()
478  // unwrap outer future from async() to get the future we want from m_mgr.AbortDaqAsync()
479  .unwrap();
480 }
481 
482 boost::future<std::shared_ptr<::daqif::DaqReply>>
483 OcmDaqService::UpdateKeywords(const std::string& id, const std::string& keywords) {
484  return boost::async(
485  m_executor,
486  [self = shared_from_this(), id, keywords]() -> std::shared_ptr<::daqif::DaqReply> {
487  self->m_event_log->AddEvent(
489  fmt::format("Request received: "
490  "UpdateKeywords(id='{0}', keywords='{1}')",
491  id,
492  keywords),
493  std::nullopt));
494  daq::fits::KeywordVector parsed_keywords;
495  try {
496  daq::fits::ParseJsonKeywords(keywords.c_str()).swap(parsed_keywords);
497  } catch (nlohmann::json::exception const& e) {
498  LOG4CPLUS_ERROR(
499  self->m_logger,
500  fmt::format("UpdateKeywords(id='{}', ...): Failed to parse JSON", id));
501  throw boost::enable_current_exception(
502  daqif::DaqException(id, fmt::format("Invalid JSON string: {}", e.what())));
503  } catch (std::invalid_argument const& e) {
504  LOG4CPLUS_ERROR(
505  self->m_logger,
506  fmt::format(
507  "UpdateKeywords(id='{}', ...): JSON could be parsed but was invalid "
508  "schema",
509  id));
510  throw boost::enable_current_exception(
511  daqif::DaqException(id, fmt::format("Invalid JSON schema: {}", e.what())));
512  } catch (std::exception const& e) {
513  LOG4CPLUS_ERROR(
514  self->m_logger,
515  fmt::format(
516  "UpdateKeywords(id='{}', ...): std::exception: '{}'", id, e.what()));
517  throw boost::enable_current_exception(
518  daqif::DaqException(id, fmt::format("std::exception: {}", e.what())));
519  } catch (...) {
520  throw boost::enable_current_exception(daqif::DaqException(id, "unknown error"));
521  }
522  try {
523  self->m_mgr.UpdateKeywords(id, parsed_keywords);
524  auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
525  rep->setId(id);
526  rep->setError(false);
527  return rep;
528  } catch (std::invalid_argument const& e) {
529  LOG4CPLUS_ERROR(
530  self->m_logger,
531  fmt::format("UpdateKeywords(id='{}'): Invalid data acquisition id", id));
532  throw boost::enable_current_exception(
533  daqif::DaqException(id, fmt::format("No data acquisition with id='{}'", id)));
534  } catch (std::exception const& e) {
535  LOG4CPLUS_ERROR(
536  self->m_logger,
537  fmt::format(
538  "UpdateKeywords(id='{}', ...): std::exception: '{}'", id, e.what()));
539  throw boost::enable_current_exception(
540  daqif::DaqException(id, fmt::format("std::exception: {}", e.what())));
541  } catch (...) {
542  throw boost::enable_current_exception(daqif::DaqException(id, "unknown error"));
543  }
544  });
545 }
546 
547 boost::future<std::shared_ptr<::daqif::DaqStatus>> OcmDaqService::GetStatus(const std::string& id) {
548  return boost::async(
549  m_executor,
550  [self = shared_from_this(), id]() {
551  self->m_event_log->AddEvent(
553  fmt::format("Request received: "
554  "GetStatus(id='{0}')",
555  id),
556  std::nullopt));
557  try {
558  LOG4CPLUS_INFO(self->m_logger, fmt::format("GetStatus(id='{}'): Enter", id));
559  auto status = self->m_mgr.GetStatus(id);
560  auto rep = self->m_mal.createDataEntity<daqif::DaqStatus>();
561  assert(rep);
562  *rep << status;
563  LOG4CPLUS_INFO(
564  self->m_logger,
565  fmt::format("GetStatus(id='{}'): Set result -> {}", id, status.state));
566  return boost::make_ready_future<std::shared_ptr<daqif::DaqStatus>>(rep);
567  } catch (std::invalid_argument const&) {
568  LOG4CPLUS_ERROR(
569  self->m_logger,
570  fmt::format("GetStatus(id='{}'): Invalid data acquisition id", id));
571  return boost::make_exceptional_future<std::shared_ptr<daqif::DaqStatus>>(
572  boost::enable_current_exception(daqif::DaqException(
573  id, fmt::format("No data acquisition with id='{}'", id))));
574  }
575  })
576  .unwrap();
577 }
578 
579 boost::future<std::vector<std::shared_ptr<::daqif::DaqStatus>>> OcmDaqService::GetActiveList() {
580  return boost::async(
581  m_executor,
582  [self = shared_from_this()]() -> std::vector<std::shared_ptr<::daqif::DaqStatus>> {
583  self->m_event_log->AddEvent(daq::UserActionEvent("",
584  "Request received: "
585  "GetActiveList()",
586  std::nullopt));
587  auto daqs = self->m_mgr.GetDaqControllers();
588  std::vector<std::shared_ptr<daq::DaqController const>> active;
589  std::vector<std::shared_ptr<daqif::DaqStatus>> reply;
590  std::copy_if(daqs.begin(), daqs.end(), std::back_inserter(active), [](auto daq_ctl) {
591  return !daq::IsFinalState(daq_ctl->GetState());
592  });
593  std::transform(active.begin(),
594  active.end(),
595  std::back_inserter(reply),
596  [&mal = self->m_mal](auto daq_ctl) {
597  auto mal_status = mal.createDataEntity<daqif::DaqStatus>();
598  *mal_status << daq_ctl->GetStatus()->GetStatus();
599  return mal_status;
600  });
601  return reply;
602  });
603 }
604 
605 boost::future<std::shared_ptr<::daqif::AwaitDaqReply>> OcmDaqService::AwaitDaqState(
606  const std::string& id, daqif::DaqState state, daqif::DaqSubState substate, double timeout) {
607  using Seconds = std::chrono::duration<double>;
608 
609  return boost::async(
610  m_executor,
611  [=, self = shared_from_this()]() {
612  self->m_event_log->AddEvent(daq::UserActionEvent(id,
613  format("Request received: "
614  "AwaitDaqState(id='{}', "
615  "state={}, substate={}, "
616  "timeout={})",
617  id,
618  daq::ToString(state),
619  daq::ToString(substate),
620  timeout),
621  std::nullopt));
622  if (timeout <= 0) {
623  return boost::make_exceptional_future<daq::Result<daq::Status>>(
624  std::invalid_argument(
625  format("Invalid argument `timeout`. Must be > 0", timeout)));
626  }
627  if (!daqif::IsStateValid(state, substate)) {
628  return boost::make_exceptional_future<daq::Result<daq::Status>>(
629  std::invalid_argument(fmt::format(
630  "Invalid state combination: {} and {}", state, substate)));
631  }
632  auto daq_state = daq::MakeState({state, substate});
633  return self->m_mgr.AwaitDaqStateAsync(
634  id, daq_state, duration_cast<milliseconds>(Seconds(timeout)));
635  })
636  .unwrap()
637  .then(m_executor,
638  [id, self = shared_from_this()](boost::future<daq::Result<daq::Status>> fut) {
639  try {
640  auto [timeout, status] = fut.get();
641  auto mal_reply = self->m_mal.createDataEntity<daqif::AwaitDaqReply>();
642  assert(mal_reply);
643  mal_reply->setTimeout(timeout);
644  auto mal_status_ptr = mal_reply->getStatus();
645  assert(mal_status_ptr);
646  *mal_status_ptr << status;
648  id,
649  fmt::format("Request completed: {}",
650  (timeout ? "condition not yet satisfied (timeout)"
651  : "condition satisfied")),
652  std::nullopt);
653  return mal_reply;
654  } catch (std::exception const& e) {
655  self->m_event_log->AddEvent(daq::ActionEvent(
656  id,
657  fmt::format("Request completed exceptionally: {}", e.what()),
658  std::nullopt));
659  throw boost::enable_current_exception(daqif::DaqException(id, e.what()));
660  } catch (...) {
662  id, "Request completed exceptionally: Unknown exception", std::nullopt);
663  throw boost::enable_current_exception(
664  daqif::DaqException(id, "Uknown exception"));
665  }
666  });
667 }
ParseSourceUri
ParsedSource ParseSourceUri(std::string_view s)
Parse user provided string in the format "<name>@<rr-uri>".
Definition: ocmDaqService.cpp:93
daq::DaqContext::prim_sources
std::vector< Source > prim_sources
Definition: daqContext.hpp:80
ParsedSource::operator==
bool operator==(ParsedSource const &rhs) const
Definition: ocmDaqService.cpp:81
OcmDaqService::ForceStopDaq
boost::future< std::shared_ptr<::daqif::DaqReply > > ForceStopDaq(const std::string &id) override
Definition: ocmDaqService.cpp:331
OcmDaqService::GetStatus
boost::future< std::shared_ptr<::daqif::DaqStatus > > GetStatus(const std::string &id) override
Definition: ocmDaqService.cpp:547
json.hpp
Contains data structure for FITS keywords.
ParseSourceUris
std::vector< ParsedSource > ParseSourceUris(std::string_view s)
Parse user provided string in the format "<name>@<rr-uri>[ <name>@...]".
Definition: ocmDaqService.cpp:130
daqif::IsStateValid
bool IsStateValid(DaqState state, DaqSubState substate)
Validate state combination.
Definition: state.cpp:16
OcmDaqService::GetActiveList
boost::future< std::vector< std::shared_ptr<::daqif::DaqStatus > > > GetActiveList() override
Definition: ocmDaqService.cpp:579
daq::DaqContext::await_interval
std::chrono::milliseconds await_interval
Interval (and thus duration) of the requests sent to primary sources to await end of recording.
Definition: daqContext.hpp:98
OcmDaqService::UpdateKeywords
boost::future< std::shared_ptr<::daqif::DaqReply > > UpdateKeywords(const std::string &id, const std::string &keywords) override
Definition: ocmDaqService.cpp:483
daq::error::NestedExceptionReporter::Str
std::string Str() const
Convenience function for constructing a std::string from the exception.
Definition: report.cpp:97
daq::ErrorEvent
Definition: eventLog.hpp:69
daq::DaqContext::id
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:64
daq::DaqContext::creation_time
std::chrono::system_clock::time_point creation_time
Time when DAQ was created.
Definition: daqContext.hpp:112
operator<<
std::ostream & operator<<(std::ostream &os, ParsedSource const &s)
Definition: ocmDaqService.cpp:88
report.hpp
conversion.hpp
Contains support functions for daqif.
OcmDaqService::StartDaq
boost::future< std::shared_ptr<::daqif::DaqReply > > StartDaq(const std::string &id, const std::string &file_prefix, const std::string &primary_sources, const std::string &metadata_sources, const std::string &properties) override
Definition: ocmDaqService.cpp:180
OcmDaqService::ForceAbortDaq
boost::future< std::shared_ptr<::daqif::DaqReply > > ForceAbortDaq(const std::string &id) override
Definition: ocmDaqService.cpp:406
daq::error::NestedExceptionReporter
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
daq::DaqContext
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:48
ParsedSource::name
std::string name
Definition: ocmDaqService.hpp:34
daqController.hpp
Contains declaration for for DaqController.
OcmDaqService::AbortDaq
boost::future< std::shared_ptr<::daqif::DaqReply > > AbortDaq(const std::string &id) override
Definition: ocmDaqService.cpp:391
daq::ToString
std::string_view ToString(daqif::DaqState state) noexcept
Definition: conversion.cpp:142
daq::UserActionEvent
Event directly related to user action, such as a command to do something.
Definition: eventLog.hpp:65
ocmDaqService.hpp
Declaration of OcmDaqService.
daq::DaqContext::keywords
std::vector< daq::fits::KeywordVariant > keywords
Keyword list provided by OCM to Data Product.
Definition: daqContext.hpp:91
daq::ParseSource
DpSpec::SourceTypes ParseSource(Json const &json, JsonPointer const &breadcrumb)
Definition: dpSpec.cpp:198
ParseStartDaqContext
daq::DaqContext ParseStartDaqContext(std::string const &json_properties)
Parse the JSON properties user provides with StartDaq.
Definition: ocmDaqService.cpp:46
daq::DaqContext::file_id
std::string file_id
Data Product FileId as specified by OLAS ICD.
Definition: daqContext.hpp:69
ParsedSource::ParsedSource
ParsedSource()=default
daq::DaqContext::meta_sources
std::vector< Source > meta_sources
Definition: daqContext.hpp:81
OcmDaqService::OcmDaqService
OcmDaqService(boost::asio::io_context &io_ctx, mal::Mal &mal, daq::Manager &mgr, std::string proc_name, std::string output_path, std::shared_ptr< daq::ObservableEventLog > event_log)
Definition: ocmDaqService.cpp:151
OcmDaqService::AwaitDaqState
boost::future< std::shared_ptr<::daqif::AwaitDaqReply > > AwaitDaqState(const std::string &id, daqif::DaqState state, daqif::DaqSubState substate, double timeout) override
Definition: ocmDaqService.cpp:605
daq::fits::KeywordVector
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
OcmDaqService::~OcmDaqService
~OcmDaqService()
Definition: ocmDaqService.cpp:175
daq::MakeState
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:63
OcmDaqService::StopDaq
boost::future< std::shared_ptr<::daqif::DaqReply > > StopDaq(const std::string &id) override
Definition: ocmDaqService.cpp:316
daq::DaqContext::dp_name_prefix
std::string dp_name_prefix
Data product file name prefix.
Definition: daqContext.hpp:79
elt::mal
Definition: dpmClient.hpp:25
server::LOGGER_NAME
const std::string LOGGER_NAME
Definition: logger.hpp:17
daq::ErrorPolicy::Strict
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
state.hpp
Contains State support functions for daqif.
ParsedSource::rr_uri
std::string rr_uri
Definition: ocmDaqService.hpp:35
daq::fits::ParseJsonKeywords
std::vector< KeywordVariant > ParseJsonKeywords(char const *keywords)
Parse and return FITS keywords.
Definition: json.cpp:124
daq::DaqContext::process_name
std::string process_name
User defined process name.
Definition: daqContext.hpp:74
ParsedSource
Definition: ocmDaqService.hpp:25
daq::Manager
Manager owns DaqController and FitsController (active data acquisitions) instances and multiplexes re...
Definition: manager.hpp:124
daq::ActionEvent
Event related to an action being requested or performed.
Definition: eventLog.hpp:56
daq::json
nlohmann::json json
Definition: json.cpp:20