ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
ocmDaqService.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup daq_ocm_server
4  * @copyright 2022 ESO - European Southern Observatory
5  *
6  * @brief Declaration of OcmDaqService
7  */
8 #include "ocmDaqService.hpp"
9 
10 #include <algorithm>
11 #include <regex>
12 
13 #include <fmt/format.h>
14 #include <fmt/ostream.h>
15 #include <log4cplus/loggingmacros.h>
16 #include <mal/rr/qos/ConnectionTime.hpp>
17 #include <nlohmann/json.hpp>
18 
19 #include <daq/conversion.hpp>
20 #include <daq/daqController.hpp>
21 #include <daq/error/report.hpp>
22 #include <daq/fits/json.hpp>
23 #include <daqif/state.hpp>
24 
25 #include <daq/json/startDaqV2.hpp>
26 
27 #include "logger.hpp"
28 
29 using boost::enable_current_exception;
30 using boost::make_exceptional_future;
32 using fmt::format;
33 using std::chrono::duration_cast;
34 using std::chrono::milliseconds;
35 
36 namespace {
37 
38 std::vector<daq::DaqContext::Source> ParseSource(std::string const& str) {
39  std::vector<daq::DaqContext::Source> sources;
40 
41  std::vector<ParsedSource> raw_sources = ParseSourceUris(str);
42  sources.reserve(raw_sources.size());
43  for (auto const& raw : raw_sources) {
44  sources.push_back({raw.name, raw.rr_uri});
45  }
46 
47  return sources;
48 }
49 
50 void ValidateFilePrefix(char const* file_prefix) {
51  auto file_regex = std::regex(R"(^[-a-zA-Z0-9_\.]*$)");
52 
53  if (!std::regex_match(file_prefix, file_regex)) {
54  throw std::invalid_argument(
55  fmt::format("file_prefix \"{}\" contains illegal characters, allowed: [a-zA-Z-0-9-_.]",
56  file_prefix));
57  }
58 }
59 
60 /**
61  * Validate arguments.
62  *
63  * TODO:
64  * - Check merge target.
65  * - Source names must be unique (and shouldn't use reserved names)
66  *
67  * @throw std::invalid_argument on validation error.
68  */
69 void ValidateDaqContext(daq::DaqContext const& ctx) {
70  ValidateFilePrefix(ctx.dp_name_prefix.c_str());
71 }
72 
73 } // namespace
74 
75 daq::DaqContext ParseStartDaqContext(std::string const& json_properties) {
76  using std::chrono::duration_cast;
77  using std::chrono::milliseconds;
78  using Seconds = std::chrono::duration<double>;
79  daq::DaqContext properties;
80 
81  if (json_properties.empty()) {
82  // No arguments
83  return properties;
84  }
85 
86  auto json = nlohmann::json::parse(json_properties);
87  if (!json.is_object()) {
88  throw boost::enable_current_exception(std::invalid_argument(
89  fmt::format("expected type object but got type {}", json.type_name())));
90  }
91  if (json.contains("keywords")) {
92  properties.keywords = daq::fits::ParseJsonKeywords(json["keywords"]);
93  }
94  if (json.contains("awaitInterval")) {
95  auto& value = json["awaitInterval"];
96  if (!value.is_number()) {
97  throw boost::enable_current_exception(std::invalid_argument(
98  fmt::format("'awaitInterval': unsupported type: {}", value.type_name())));
99  }
100  auto await_interval = value.get<double>();
101  if (await_interval < 0.0) {
102  throw boost::enable_current_exception(std::invalid_argument(
103  fmt::format("'awaitInterval' must be positive number, got {}", await_interval)));
104  }
105  properties.await_interval = duration_cast<milliseconds>(Seconds(value.get<double>()));
106  }
107  return properties;
108 }
109 
110 bool ParsedSource::operator==(ParsedSource const& rhs) const {
111  return name == rhs.name && rr_uri == rhs.rr_uri;
112 }
113 
114 ParsedSource::ParsedSource(std::string name, std::string rr_uri)
115  : name(std::move(name)), rr_uri(std::move(rr_uri)) {
116 }
117 std::ostream& operator<<(std::ostream& os, ParsedSource const& s) {
118  os << "name: '" << s.name << "', rr_uri='" << s.rr_uri << "'";
119  return os;
120 }
121 
122 ParsedSource ParseSourceUri(std::string_view s) {
123  auto start = s.find_first_not_of(' ');
124  auto name_end_pos = s.find_first_of('@');
125 
126  if (name_end_pos == std::string_view::npos) {
127  throw boost::enable_current_exception(
128  std::invalid_argument("separator '@' not found in expected format 'name@rr-uri'"));
129  }
130  auto name = s.substr(start, name_end_pos - start);
131  if (name.empty()) {
132  throw boost::enable_current_exception(
133  std::invalid_argument("name part in 'name@rr-uri' is empty"));
134  }
135 
136  start = name_end_pos + 1;
137  if (start >= s.size()) {
138  throw boost::enable_current_exception(
139  std::invalid_argument("invalid format string, expected format 'name@rr-uri'"));
140  }
141  auto rr_uri_end_pos = s.find_first_of(" ,", start);
142  if (name_end_pos == std::string_view::npos) {
143  throw boost::enable_current_exception(std::invalid_argument("separator ',' not found"));
144  }
145 
146  auto rr_uri = s.substr(start, rr_uri_end_pos - start);
147  if (rr_uri.empty()) {
148  throw boost::enable_current_exception(
149  std::invalid_argument("rr_uri part in 'name@rr-uri' is empty"));
150  }
151  return ParsedSource(std::string(name), std::string(rr_uri));
152 }
153 /**
154  * Parse user provided string in the format
155  * "<name>@<rr-uri>[ <name>@...]"
156  *
157  * @throw std::invalid_argument on errors.
158  */
159 std::vector<ParsedSource> ParseSourceUris(std::string_view s) {
160  std::vector<ParsedSource> result;
161  size_t begin = 0;
162 
163  while (begin < s.size()) {
164  const auto end = s.find_first_of(' ', begin);
165 
166  if (begin != end) {
167  result.emplace_back(ParseSourceUri(s.substr(begin, end - begin)));
168  }
169 
170  if (end == std::string_view::npos) {
171  break;
172  }
173 
174  begin = end + 1;
175  }
176 
177  return result;
178 }
179 
180 OcmDaqService::OcmDaqService(boost::asio::io_context& io_ctx,
181  mal::Mal& mal,
182  daq::Manager& mgr,
183  std::string proc_name,
184  std::string output_path,
185  std::shared_ptr<daq::ObservableEventLog> event_log)
186  : m_io_ctx(io_ctx)
187  , m_executor(m_io_ctx)
188  , m_mal(mal)
189  , m_mgr(mgr)
190  , m_proc_name(std::move(proc_name))
191  , m_output_path(std::move(output_path))
192  , m_event_log(std::move(event_log))
193  , m_log_observer_connection()
194  , m_log_observer(log4cplus::Logger::getInstance(server::LOGGER_NAME_EVENTLOG))
195  , m_logger(log4cplus::Logger::getInstance(server::LOGGER_NAME)) {
196  m_log_observer_connection =
197  m_event_log->ConnectObserver(std::reference_wrapper(m_log_observer));
198  if (m_proc_name.empty()) {
199  throw boost::enable_current_exception(
200  std::invalid_argument("OcmDaqService: Process name cannot be empty"));
201  }
202 }
203 
205  m_log_observer_connection.disconnect();
206 }
207 
208 boost::future<std::shared_ptr<::daqif::DaqReply>>
209 OcmDaqService::StartDaq(const std::string& id,
210  const std::string& file_prefix,
211  const std::string& primary_sources,
212  const std::string& metadata_sources,
213  const std::string& json_properties) {
214  return boost::async(
215  m_executor,
216  [=, self = shared_from_this()]() mutable {
217  self->m_event_log->AddEvent(daq::UserActionEvent(
218  id,
219  fmt::format("Request received: "
220  "StartDaq(id='{0}', file_prefix='{1}', "
221  "primary_sources='{2}', metadata_sources='{3}', "
222  "json_properties='{4}'",
223  id,
224  file_prefix,
225  primary_sources,
226  metadata_sources,
227  json_properties),
228  std::nullopt));
229  try {
230  ValidateFilePrefix(file_prefix.c_str());
231  } catch (std::exception const& e) {
232  self->m_event_log->AddEvent(
233  daq::ErrorEvent(id, e.what(), std::nullopt, "user"));
234  return boost::make_exceptional_future<std::shared_ptr<::daqif::DaqReply>>(
235  daqif::DaqException(id, e.what()));
236  }
237  // Parse provided JSON
238  // Validation that require state is performed in m_executor for thread safety.
239  daq::DaqContext context;
240  try {
241  context = ParseStartDaqContext(json_properties);
242  } catch (std::exception const& e) {
243  auto msg =
244  fmt::format("Failed to parse StartDaq JSON properties: {}", e.what());
245  self->m_event_log->AddEvent(daq::ErrorEvent(id, msg, std::nullopt, "user"));
246  return boost::make_exceptional_future<std::shared_ptr<::daqif::DaqReply>>(
247  daqif::DaqException(id, msg));
248  }
249 
250  context.file_id = self->m_mgr.MakeDaqId(&context.creation_time);
251  auto validated_id = id;
252  if (validated_id.empty()) {
253  // User did not provide an ID -> use file_id as DAQ id
254  validated_id = context.file_id;
255  LOG4CPLUS_INFO(
256  self->m_logger,
257  fmt::format("StartDaq(id='{0}'): Created and assigned DAQ id='{0}'",
258  context.file_id));
259  } else {
260  // Check that an instance does not already exist with id
261  if (self->m_mgr.HaveDaq(validated_id)) {
262  LOG4CPLUS_INFO(
263  self->m_logger,
264  fmt::format("StartDaq(id='{0}'): DAQ with id='{0}' already exist",
265  validated_id));
266  return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
267  daqif::DaqException(id,
268  "Data acquisition with same id already exist"));
269  }
270  }
271 
272  try {
273  context.id = validated_id;
274  context.process_name = m_proc_name;
275  context.dp_name_prefix = file_prefix;
276  // Create primary sources
277  context.prim_sources = ParseSource(primary_sources);
278  // Create metadata sources
279  context.meta_sources = ParseSource(metadata_sources);
280 
281  // Start
282  return self->StartDaq(context, "StartDaq");
283  } catch (daqif::DaqException const&) {
284  // Already correct exception
285  throw;
286  } catch (std::invalid_argument const& e) {
287  LOG4CPLUS_INFO(self->m_logger,
288  fmt::format("StartDaq(id='{}'): Invalid argument error while "
289  "processing request: {}",
290  validated_id,
291  e.what()));
292  return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
293  daqif::DaqException(validated_id, e.what()));
294  } catch (std::exception const& e) {
295  LOG4CPLUS_INFO(self->m_logger,
296  fmt::format("StartDaq(id='{}'): Error while"
297  "processing request: {}",
298  validated_id,
299  e.what()));
300  return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
301  daqif::DaqException(validated_id, e.what()));
302  } catch (...) {
303  LOG4CPLUS_INFO(
304  self->m_logger,
305  fmt::format("StartDaq(id='{}'): Unknown error while processing request",
306  validated_id));
307  return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
308  daqif::DaqException(validated_id, "Uknown error"));
309  }
310  })
311  // unwrap outer future from async() to get the future we want from m_mgr.StartDaqAsync()
312  .unwrap();
313 }
314 
315 boost::future<std::shared_ptr<::daqif::DaqReply>>
316 OcmDaqService::StartDaqV2(const std::string& specification) {
317  return boost::async(
318  m_executor,
319  [=, self = shared_from_this()]() mutable {
320  self->m_event_log->AddEvent(
322  fmt::format("Request received: "
323  "StartDaqV2(specification=\n'{0}')",
324  specification),
325  std::nullopt));
326  try {
327  // Parse provided JSON
328  auto parsed =
329  daq::json::ParseStartDaqV2Spec(nlohmann::json::parse(specification));
330 
331  // Validation that require state is performed in m_executor for thread
332  // safety.
333  daq::DaqContext context;
334  UpdateFrom(context, parsed);
335 
336  assert(context.specification.has_value());
337 
338  ValidateDaqContext(context);
339 
340  {
341  nlohmann::json j;
342  to_json(j, *context.specification);
343  LOG4CPLUS_DEBUG(self->m_logger,
344  "Resulting specification after parsing: \n"
345  << j.dump(2));
346  }
347 
348  // Start
349  return self->StartDaq(context, "StartDaqV2");
350  } catch (daqif::DaqException const&) {
351  // Already correct exception
352  throw;
353  } catch (nlohmann::json::exception const& e) {
354  throw boost::enable_current_exception(daqif::DaqException(
355  "", fmt::format("JSON parsing error: {}", e.what())));
356  } catch (daq::json::SchemaError const& e) {
357  throw boost::enable_current_exception(
358  daqif::DaqException("", fmt::format("JSON Schema error: {}", e.what())));
359  } catch (std::invalid_argument const& e) {
360  throw boost::enable_current_exception(
361  daqif::DaqException("", fmt::format("Invalid argument: {}", e.what())));
362  } catch (std::exception const& e) {
363  throw boost::enable_current_exception(daqif::DaqException("", e.what()));
364  }
365  })
366  .unwrap();
367 }
368 
369 boost::future<std::shared_ptr<::daqif::DaqReply>> OcmDaqService::StopDaq(const std::string& id) {
370  return boost::async(
371  m_executor,
372  [self = shared_from_this(), id]() {
373  self->m_event_log->AddEvent(daq::UserActionEvent(id,
374  fmt::format("Request received: "
375  "StopDaq(id='{0}')",
376  id),
377  std::nullopt));
378  return self->StopDaq(id, false);
379  })
380  .unwrap();
381 }
382 
383 boost::future<std::shared_ptr<::daqif::DaqReply>>
384 OcmDaqService::ForceStopDaq(const std::string& id) {
385  return boost::async(m_executor,
386  [self = shared_from_this(), id]() {
387  self->m_event_log->AddEvent(
389  fmt::format("Request received: "
390  "ForceStopDaq(id='{0}')",
391  id),
392  std::nullopt));
393  return self->StopDaq(id, true);
394  })
395  .unwrap();
396 }
397 
398 boost::future<std::shared_ptr<::daqif::DaqReply>>
399 OcmDaqService::StartDaq(daq::DaqContext const& context, char const* function) {
400  return m_mgr.StartDaqAsync(context).then(
401  m_executor,
402  [function, weak_self = std::weak_ptr(shared_from_this()), id = context.id](
403  boost::future<daq::State> f) -> std::shared_ptr<daqif::DaqReply> {
404  auto self = weak_self.lock();
405  if (!self) {
406  LOG4CPLUS_WARN(LOGGER_NAME,
407  fmt::format("{}(id='{}'): StartDaqAsync is "
408  "complete but MAL service has "
409  "been abandoned. Throwing exception.",
410  function,
411  id));
412  throw boost::enable_current_exception(
413  daqif::DaqException(id, "Service has been abandoned"));
414  }
415  try {
416  f.get();
417  auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
418  assert(rep);
419  rep->setId(id);
420  rep->setError(false);
421  return rep;
422  } catch (...) {
423  auto what = self->MakeExceptionMessageWithStatus(id, std::current_exception());
424  LOG4CPLUS_ERROR(self->m_logger,
425  fmt::format("{}(id='{}'): StartDaqAsync "
426  "completed with failure: {}",
427  function,
428  id,
429  what));
430  throw boost::enable_current_exception(
431  daqif::DaqException(id, fmt::format("{}() failed: {}", function, what)));
432  }
433  });
434 }
435 
436 boost::future<std::shared_ptr<::daqif::DaqReply>>
437 OcmDaqService::StopDaq(const std::string& id, bool forced) {
438  return boost::async(
439  m_executor,
440  [self = shared_from_this(), id, forced]() {
441  return self->m_mgr
442  .StopDaqAsync(id,
444  .then(
445  self->m_executor,
446  [weak_self = std::weak_ptr(self->shared_from_this()),
447  id](boost::future<daq::Status> f) -> std::shared_ptr<daqif::DaqReply> {
448  auto self = weak_self.lock();
449  if (!self) {
450  LOG4CPLUS_WARN(LOGGER_NAME,
451  fmt::format("StopDaq(id='{}'): StopDaqAsync is "
452  "complete but MAL service has "
453  "been abandoned. Throwing exception.",
454  id));
455  throw boost::enable_current_exception(
456  daqif::DaqException(id, "Service has been abandoned"));
457  }
458  try {
459  f.get();
460  auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
461  assert(rep);
462  rep->setId(id);
463  rep->setError(false);
464  return rep;
465  } catch (...) {
466  auto what = self->MakeExceptionMessageWithStatus(
467  id, std::current_exception());
468  LOG4CPLUS_INFO(self->m_logger,
469  fmt::format("StopDaq(id='{}'): "
470  "completed with failure: {}",
471  id,
472  what));
473  throw boost::enable_current_exception(daqif::DaqException(
474  id, fmt::format("Stop failed\n\n{}", what)));
475  }
476  });
477  })
478  // unwrap outer future from async() to get the future we want from m_mgr.StopDaqAsync()
479  .unwrap();
480 }
481 
482 boost::future<std::shared_ptr<::daqif::DaqReply>> OcmDaqService::AbortDaq(const std::string& id) {
483  return boost::async(m_executor,
484  [self = shared_from_this(), id]() {
485  self->m_event_log->AddEvent(
487  fmt::format("Request received: "
488  "AbortDaq(id='{0}')",
489  id),
490  std::nullopt));
491  return self->AbortDaq(id, false);
492  })
493  .unwrap();
494 }
495 
496 boost::future<std::shared_ptr<::daqif::DaqReply>>
497 OcmDaqService::ForceAbortDaq(const std::string& id) {
498  return boost::async(m_executor,
499  [self = shared_from_this(), id]() {
500  self->m_event_log->AddEvent(
502  fmt::format("Request received: "
503  "ForceAbortDaq(id='{0}')",
504  id),
505  std::nullopt));
506  return self->AbortDaq(id, true);
507  })
508  .unwrap();
509 }
510 
511 boost::future<std::shared_ptr<::daqif::DaqReply>>
512 OcmDaqService::AbortDaq(const std::string& id, bool forced) {
513  return boost::async(
514  m_executor,
515  [self = shared_from_this(), id, forced]() {
516  return self->m_mgr
517  .AbortDaqAsync(
519  .then(
520  self->m_executor,
521  [weak_self = std::weak_ptr(self->shared_from_this()), id, forced](
522  boost::future<daq::Status> f) -> std::shared_ptr<daqif::DaqReply> {
523  auto self = weak_self.lock();
524  if (!self) {
525  LOG4CPLUS_WARN(
526  LOGGER_NAME,
527  fmt::format("AbortDaq(id='{}', forced={}): AbortDaqAsync is "
528  "complete but MAL service has "
529  "been abandoned. Throwing exception.",
530  id,
531  forced));
532  throw boost::enable_current_exception(
533  daqif::DaqException(id, "Service has been abandoned"));
534  }
535  try {
536  auto result = f.get();
537  LOG4CPLUS_INFO(
538  self->m_logger,
539  fmt::format("AbortDaq(id='{}', forced={}): "
540  "AbortDaqAsync Completed successfully",
541  id,
542  forced));
543  auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
544  assert(rep);
545  rep->setId(id);
546  rep->setError(result.error);
547  LOG4CPLUS_DEBUG(
548  self->m_logger,
549  fmt::format("AbortDaq(id='{}', forced={}): "
550  "AbortDaqAsync Completed, returning reply now.",
551  id,
552  forced));
553  return rep;
554  } catch (...) {
555  auto what = self->MakeExceptionMessageWithStatus(
556  id, std::current_exception());
557  LOG4CPLUS_ERROR(self->m_logger,
558  fmt::format("AbortDaq(id='{}', forced={}): "
559  "AbortDaqAsync Completed "
560  "with fatal error:\n{}",
561  id,
562  forced,
563  what));
564  throw boost::enable_current_exception(daqif::DaqException(
565  id, fmt::format("Abort failed\n\n{}", what)));
566  }
567  }); // m_msg.AbortDaqAsync cont
568  }) // boost::async()
569  // unwrap outer future from async() to get the future we want from m_mgr.AbortDaqAsync()
570  .unwrap();
571 }
572 
573 boost::future<std::shared_ptr<::daqif::DaqReply>>
574 OcmDaqService::UpdateKeywords(const std::string& id, const std::string& keywords) {
575  return boost::async(
576  m_executor,
577  [self = shared_from_this(), id, keywords]() -> std::shared_ptr<::daqif::DaqReply> {
578  self->m_event_log->AddEvent(
580  fmt::format("Request received: "
581  "UpdateKeywords(id='{0}', keywords='{1}')",
582  id,
583  keywords),
584  std::nullopt));
585  daq::fits::KeywordVector parsed_keywords;
586  try {
587  daq::fits::ParseJsonKeywords(keywords.c_str()).swap(parsed_keywords);
588  } catch (nlohmann::json::exception const& e) {
589  LOG4CPLUS_ERROR(
590  self->m_logger,
591  fmt::format("UpdateKeywords(id='{}', ...): Failed to parse JSON", id));
592  throw boost::enable_current_exception(
593  daqif::DaqException(id, fmt::format("Invalid JSON string: {}", e.what())));
594  } catch (std::invalid_argument const& e) {
595  LOG4CPLUS_ERROR(
596  self->m_logger,
597  fmt::format(
598  "UpdateKeywords(id='{}', ...): JSON could be parsed but was invalid "
599  "schema",
600  id));
601  throw boost::enable_current_exception(
602  daqif::DaqException(id, fmt::format("Invalid JSON schema: {}", e.what())));
603  } catch (std::exception const& e) {
604  LOG4CPLUS_ERROR(
605  self->m_logger,
606  fmt::format(
607  "UpdateKeywords(id='{}', ...): std::exception: '{}'", id, e.what()));
608  throw boost::enable_current_exception(
609  daqif::DaqException(id, fmt::format("std::exception: {}", e.what())));
610  } catch (...) {
611  throw boost::enable_current_exception(daqif::DaqException(id, "unknown error"));
612  }
613  try {
614  self->m_mgr.UpdateKeywords(id, parsed_keywords);
615  auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
616  rep->setId(id);
617  rep->setError(false);
618  return rep;
619  } catch (std::invalid_argument const& e) {
620  LOG4CPLUS_ERROR(
621  self->m_logger,
622  fmt::format("UpdateKeywords(id='{}'): Invalid data acquisition id", id));
623  throw boost::enable_current_exception(
624  daqif::DaqException(id, fmt::format("No data acquisition with id='{}'", id)));
625  } catch (std::exception const& e) {
626  LOG4CPLUS_ERROR(
627  self->m_logger,
628  fmt::format(
629  "UpdateKeywords(id='{}', ...): std::exception: '{}'", id, e.what()));
630  throw boost::enable_current_exception(
631  daqif::DaqException(id, fmt::format("std::exception: {}", e.what())));
632  } catch (...) {
633  throw boost::enable_current_exception(daqif::DaqException(id, "unknown error"));
634  }
635  });
636 }
637 
638 boost::future<std::shared_ptr<::daqif::DaqStatus>> OcmDaqService::GetStatus(const std::string& id) {
639  return boost::async(
640  m_executor,
641  [self = shared_from_this(), id]() {
642  self->m_event_log->AddEvent(
644  fmt::format("Request received: "
645  "GetStatus(id='{0}')",
646  id),
647  std::nullopt));
648  try {
649  LOG4CPLUS_INFO(self->m_logger, fmt::format("GetStatus(id='{}'): Enter", id));
650  auto status = self->m_mgr.GetStatus(id);
651  auto rep = self->m_mal.createDataEntity<daqif::DaqStatus>();
652  assert(rep);
653  *rep << status;
654  LOG4CPLUS_INFO(
655  self->m_logger,
656  fmt::format("GetStatus(id='{}'): Set result -> {}", id, status.state));
657  return boost::make_ready_future<std::shared_ptr<daqif::DaqStatus>>(rep);
658  } catch (std::invalid_argument const&) {
659  LOG4CPLUS_ERROR(
660  self->m_logger,
661  fmt::format("GetStatus(id='{}'): Invalid data acquisition id", id));
662  return boost::make_exceptional_future<std::shared_ptr<daqif::DaqStatus>>(
663  boost::enable_current_exception(daqif::DaqException(
664  id, fmt::format("No data acquisition with id='{}'", id))));
665  }
666  })
667  .unwrap();
668 }
669 
670 boost::future<std::vector<std::shared_ptr<::daqif::DaqStatus>>> OcmDaqService::GetActiveList() {
671  return boost::async(
672  m_executor,
673  [self = shared_from_this()]() -> std::vector<std::shared_ptr<::daqif::DaqStatus>> {
674  self->m_event_log->AddEvent(daq::UserActionEvent("",
675  "Request received: "
676  "GetActiveList()",
677  std::nullopt));
678  auto daqs = self->m_mgr.GetDaqControllers();
679  std::vector<std::shared_ptr<daq::DaqController const>> active;
680  std::vector<std::shared_ptr<daqif::DaqStatus>> reply;
681  std::copy_if(daqs.begin(), daqs.end(), std::back_inserter(active), [](auto daq_ctl) {
682  return !daq::IsFinalState(daq_ctl->GetState());
683  });
684  std::transform(active.begin(),
685  active.end(),
686  std::back_inserter(reply),
687  [&mal = self->m_mal](auto daq_ctl) {
688  auto mal_status = mal.createDataEntity<daqif::DaqStatus>();
689  *mal_status << daq_ctl->GetStatus()->GetStatus();
690  return mal_status;
691  });
692  return reply;
693  });
694 }
695 
696 boost::future<std::shared_ptr<::daqif::AwaitDaqReply>> OcmDaqService::AwaitDaqState(
697  const std::string& id, daqif::DaqState state, daqif::DaqSubState substate, double timeout) {
698  using Seconds = std::chrono::duration<double>;
699 
700  return boost::async(
701  m_executor,
702  [=, self = shared_from_this()]() {
703  self->m_event_log->AddEvent(daq::UserActionEvent(id,
704  format("Request received: "
705  "AwaitDaqState(id='{}', "
706  "state={}, substate={}, "
707  "timeout={})",
708  id,
709  daq::ToString(state),
710  daq::ToString(substate),
711  timeout),
712  std::nullopt));
713  if (timeout <= 0) {
714  return boost::make_exceptional_future<daq::Result<daq::Status>>(
715  std::invalid_argument(
716  format("Invalid argument `timeout`. Must be > 0", timeout)));
717  }
718  if (!daqif::IsStateValid(state, substate)) {
719  return boost::make_exceptional_future<daq::Result<daq::Status>>(
720  std::invalid_argument(fmt::format(
721  "Invalid state combination: {} and {}", state, substate)));
722  }
723  auto daq_state = daq::MakeState({state, substate});
724  return self->m_mgr.AwaitDaqStateAsync(
725  id, daq_state, duration_cast<milliseconds>(Seconds(timeout)));
726  })
727  .unwrap()
728  .then(
729  m_executor,
730  [id, self = shared_from_this()](boost::future<daq::Result<daq::Status>> fut) {
731  try {
732  auto [timeout, status] = fut.get();
733  auto mal_reply = self->m_mal.createDataEntity<daqif::AwaitDaqReply>();
734  assert(mal_reply);
735  mal_reply->setTimeout(timeout);
736  auto mal_status_ptr = mal_reply->getStatus();
737  assert(mal_status_ptr);
738  *mal_status_ptr << status;
739  daq::ActionEvent(id,
740  fmt::format("Request completed: {}",
741  (timeout ? "condition not yet satisfied (timeout)"
742  : "condition satisfied")),
743  std::nullopt);
744  return mal_reply;
745  } catch (std::exception const& e) {
746  auto what = self->MakeExceptionMessageWithStatus(id, std::current_exception());
747  self->m_event_log->AddEvent(daq::ActionEvent(
748  id,
749  fmt::format("Await state completed exceptionally\n\n{}", what),
750  std::nullopt));
751  throw boost::enable_current_exception(
752  daqif::DaqException(id, fmt::format("Await state failed\n\n{}", what)));
753  } catch (...) {
755  id, "Request completed exceptionally: Unknown exception", std::nullopt);
756  throw boost::enable_current_exception(
757  daqif::DaqException(id, "Uknown exception"));
758  }
759  });
760 }
761 
762 std::string
763 OcmDaqService::MakeExceptionMessageWithStatus(std::string const& id,
764  std::exception_ptr const& exception) const {
765  auto nested_msg = NestedExceptionReporter(exception).Str();
766  auto alerts_msg = std::string("n/a");
767  try {
768  auto status = m_mgr.GetStatus(id);
769  alerts_msg = fmt::format("{}", status.alerts);
770  } catch (std::exception const& e) {
771  LOG4CPLUS_WARN(m_logger, fmt::format("GetStatus({}) failed:", id, e.what()));
772  }
773  return fmt::format("Errors(s):\n{}\nActive alert(s):\n{}", nested_msg, alerts_msg);
774 }
775 
776 void OcmDaqService::UpdateFrom(daq::DaqContext& context, daq::json::StartDaqV2Spec const& spec) {
777  using PrimaryDataSource = daq::json::StartDaqV2Spec::PrimaryDataSource;
778  using MetadataSource = daq::json::StartDaqV2Spec::MetadataSource;
779  using FitsKeywordsSource = daq::json::FitsKeywordsSource;
780  using FitsFileSource = daq::json::FitsFileSource;
781 
782  context.file_id = m_mgr.MakeDaqId(&context.creation_time);
783  if (context.id.empty()) {
784  context.id = !spec.id.empty() ? spec.id : context.file_id;
785  }
786 
787  if (spec.await_completion_interval.has_value()) {
789  }
790 
791  context.process_name = m_proc_name;
792  context.dp_name_prefix = spec.file_prefix;
793  context.specification = spec;
794 
795  // Create sources
796  for (auto const& variant : spec.sources) {
797  if (auto const* ds = std::get_if<PrimaryDataSource>(&variant); ds != nullptr) {
798  context.prim_sources.push_back({ds->source_name, ds->rr_uri});
799  } else if (auto const* ds = std::get_if<MetadataSource>(&variant); ds != nullptr) {
800  context.meta_sources.push_back({ds->source_name, ds->rr_uri});
801  } else if (auto const* ds = std::get_if<FitsKeywordsSource>(&variant); ds != nullptr) {
802  context.results.emplace_back(ds->source_name, ds->keywords);
803  } else if (auto const* ds = std::get_if<FitsFileSource>(&variant); ds != nullptr) {
804  context.results.emplace_back(ds->source_name, ds->location);
805  } else {
806  LOG4CPLUS_ERROR(m_logger, "Unknown variant encountered");
807  }
808  }
809 
810  // Check that an instance does not already exist with id
811  if (m_mgr.HaveDaq(context.id)) {
812  auto msg = fmt::format("DAQ with id='{0}' already exist", context.id);
813  LOG4CPLUS_INFO(m_logger, msg);
814  throw boost::enable_current_exception(std::invalid_argument(msg));
815  }
816 }
Manager owns DaqController and FitsController (active data acquisitions) instances and multiplexes re...
Definition: manager.hpp:124
virtual boost::future< State > StartDaqAsync(DaqContext ctx)=0
Start DaqController identified by id.
virtual Status GetStatus(std::string_view id) const =0
Get status.
virtual std::string MakeDaqId(std::chrono::system_clock::time_point *time=nullptr) const =0
Creates a new unique identifier based on the instrument id and current time.
virtual bool HaveDaq(std::string_view id, std::string_view file_id={}) const DAQ_NOEXCEPT=0
Query existing data acquisition by id and optional file_id.
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
std::string Str() const
Convenience function for constructing a std::string from the exception.
Definition: report.cpp:97
Contains data structure for FITS keywords.
Contains support functions for daqif.
Contains State support functions for daqif.
boost::future< std::vector< std::shared_ptr<::daqif::DaqStatus > > > GetActiveList() override
OcmDaqService(boost::asio::io_context &io_ctx, mal::Mal &mal, daq::Manager &mgr, std::string proc_name, std::string output_path, std::shared_ptr< daq::ObservableEventLog > event_log)
boost::future< std::shared_ptr<::daqif::DaqReply > > StopDaq(const std::string &id) override
boost::future< std::shared_ptr<::daqif::AwaitDaqReply > > AwaitDaqState(const std::string &id, daqif::DaqState state, daqif::DaqSubState substate, double timeout) override
boost::future< std::shared_ptr<::daqif::DaqReply > > AbortDaq(const std::string &id) override
boost::future< std::shared_ptr<::daqif::DaqReply > > StartDaqV2(const std::string &specification) override
boost::future< std::shared_ptr<::daqif::DaqReply > > UpdateKeywords(const std::string &id, const std::string &keywords) override
boost::future< std::shared_ptr<::daqif::DaqReply > > StartDaq(const std::string &id, const std::string &file_prefix, const std::string &primary_sources, const std::string &metadata_sources, const std::string &properties) override
boost::future< std::shared_ptr<::daqif::DaqReply > > ForceStopDaq(const std::string &id) override
boost::future< std::shared_ptr<::daqif::DaqStatus > > GetStatus(const std::string &id) override
boost::future< std::shared_ptr<::daqif::DaqReply > > ForceAbortDaq(const std::string &id) override
Default logger name.
const std::string LOGGER_NAME
Definition: config.hpp:22
std::vector< KeywordVariant > KeywordVector
Vector of keywords.
Definition: keyword.hpp:414
std::vector< KeywordVariant > ParseJsonKeywords(char const *keywords)
Parse and return FITS keywords.
Definition: json.cpp:124
std::optional< std::chrono::milliseconds > await_completion_interval
Definition: startDaqV2.hpp:53
DpSpec::SourceTypes ParseSource(Json const &json, JsonPointer const &breadcrumb)
Definition: dpSpec.cpp:34
StartDaqV2Spec ParseStartDaqV2Spec(nlohmann::json const &json)
Parse StartDaqSpec.
Definition: startDaqV2.cpp:46
std::vector< DataSourceTypes > sources
Definition: startDaqV2.hpp:51
Structure with a close mapping from JSON representation in the StartDaqV2 MAL request.
Definition: startDaqV2.hpp:33
daqif::FullState MakeState(State state) noexcept
Converts daq::State to DaqSubstate.
Definition: conversion.cpp:63
std::string_view ToString(daqif::DaqState state) noexcept
Definition: conversion.cpp:142
@ Strict
Any error is considered fatal and may lead to the operation being aborted.
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
NLOHMANN_JSON_SERIALIZE_ENUM(State, { {State::NotStarted, "NotStarted"}, {State::Starting, "Starting"}, {State::Acquiring, "Acquiring"}, {State::Stopping, "Stopping"}, {State::Stopped, "Stopped"}, {State::NotScheduled, "NotScheduled"}, {State::Scheduled, "Scheduled"}, {State::Transferring, "Transferring"}, {State::Merging, "Merging"}, {State::Releasing, "Releasing"}, {State::AbortingAcquiring, "AbortingAcquiring"}, {State::AbortingMerging, "AbortingMerging"}, {State::Aborted, "Aborted"}, {State::Completed, "Completed"}, }) void to_json(nlohmann void to_json(nlohmann::json &j, Alert const &p)
Definition: json.cpp:48
bool IsStateValid(DaqState state, DaqSubState substate)
Validate state combination.
Definition: state.cpp:16
const std::string LOGGER_NAME_EVENTLOG
Definition: logger.hpp:19
daq::DaqContext ParseStartDaqContext(std::string const &json_properties)
Parse the JSON properties user provides with StartDaq.
ParsedSource ParseSourceUri(std::string_view s)
Parse user provided string in the format "<name>@<rr-uri>".
std::vector< ParsedSource > ParseSourceUris(std::string_view s)
Parse user provided string in the format "<name>@<rr-uri>[ <name>@...]".
std::ostream & operator<<(std::ostream &os, ParsedSource const &s)
Declaration of OcmDaqService.
Contains declaration for for DaqController.
bool operator==(ParsedSource const &rhs) const
std::string name
ParsedSource()=default
std::string rr_uri
Event related to an action being requested or performed.
Definition: eventLog.hpp:56
Structure carrying context needed to start a Data Acquisition and construct a Data Product Specificat...
Definition: daqContext.hpp:44
std::vector< Source > meta_sources
Definition: daqContext.hpp:77
DpParts results
Results from Data Acquisition (FITS files and keywords).
Definition: daqContext.hpp:102
std::string process_name
User defined process name.
Definition: daqContext.hpp:70
std::vector< daq::fits::KeywordVariant > keywords
Keyword list provided by OCM to Data Product.
Definition: daqContext.hpp:87
std::vector< Source > prim_sources
Definition: daqContext.hpp:76
std::chrono::milliseconds await_interval
Interval (and thus duration) of the requests sent to primary sources to await end of recording.
Definition: daqContext.hpp:94
std::optional< json::StartDaqV2Spec > specification
Optional specification, if DAQ was started using StartDaqV2.
Definition: daqContext.hpp:116
std::string file_id
Data Product FileId as specified by OLAS ICD.
Definition: daqContext.hpp:65
std::string dp_name_prefix
Data product file name prefix.
Definition: daqContext.hpp:75
std::chrono::system_clock::time_point creation_time
Time when DAQ was created.
Definition: daqContext.hpp:108
std::string id
DAQ identfier, possibly provided by user.
Definition: daqContext.hpp:60
Event directly related to user action, such as a command to do something.
Definition: eventLog.hpp:65
JSON Schema error.
Definition: schemaError.hpp:18
auto const & transform