11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <log4cplus/loggingmacros.h>
14 #include <mal/rr/qos/ConnectionTime.hpp>
15 #include <nlohmann/json.hpp>
23 using boost::enable_current_exception;
24 using boost::make_exceptional_future;
27 using std::chrono::duration_cast;
28 using std::chrono::milliseconds;
32 std::vector<daq::DaqContext::Source>
ParseSource(std::string
const& str) {
33 std::vector<daq::DaqContext::Source> sources;
36 sources.reserve(raw_sources.size());
37 for (
auto const& raw : raw_sources) {
38 sources.push_back({raw.name, raw.rr_uri});
47 using std::chrono::duration_cast;
48 using std::chrono::milliseconds;
49 using Seconds = std::chrono::duration<double>;
52 if (json_properties.empty()) {
57 auto json = nlohmann::json::parse(json_properties);
58 if (!
json.is_object()) {
59 throw boost::enable_current_exception(std::invalid_argument(
60 fmt::format(
"expected type object but got type {}",
json.type_name())));
62 if (
json.contains(
"keywords")) {
65 if (
json.contains(
"awaitInterval")) {
66 auto& value =
json[
"awaitInterval"];
67 if (!value.is_number()) {
68 throw boost::enable_current_exception(std::invalid_argument(
69 fmt::format(
"'awaitInterval': unsupported type: {}", value.type_name())));
71 auto await_interval = value.get<
double>();
72 if (await_interval < 0.0) {
73 throw boost::enable_current_exception(std::invalid_argument(
74 fmt::format(
"'awaitInterval' must be positive number, got {}", await_interval)));
76 properties.
await_interval = duration_cast<milliseconds>(Seconds(value.get<
double>()));
86 : name(std::move(name)), rr_uri(std::move(rr_uri)) {
89 os <<
"name: '" << s.
name <<
"', rr_uri='" << s.
rr_uri <<
"'";
94 auto start = s.find_first_not_of(
' ');
95 auto name_end_pos = s.find_first_of(
'@');
97 if (name_end_pos == std::string_view::npos) {
98 throw boost::enable_current_exception(
99 std::invalid_argument(
"separator '@' not found in expected format 'name@rr-uri'"));
101 auto name = s.substr(start, name_end_pos - start);
103 throw boost::enable_current_exception(
104 std::invalid_argument(
"name part in 'name@rr-uri' is empty"));
107 start = name_end_pos + 1;
108 if (start >= s.size()) {
109 throw boost::enable_current_exception(
110 std::invalid_argument(
"invalid format string, expected format 'name@rr-uri'"));
112 auto rr_uri_end_pos = s.find_first_of(
" ,", start);
113 if (name_end_pos == std::string_view::npos) {
114 throw boost::enable_current_exception(std::invalid_argument(
"separator ',' not found"));
117 auto rr_uri = s.substr(start, rr_uri_end_pos - start);
118 if (rr_uri.empty()) {
119 throw boost::enable_current_exception(
120 std::invalid_argument(
"rr_uri part in 'name@rr-uri' is empty"));
122 return ParsedSource(std::string(name), std::string(rr_uri));
131 std::vector<ParsedSource> result;
134 while (begin < s.size()) {
135 const auto end = s.find_first_of(
' ', begin);
138 result.emplace_back(
ParseSourceUri(s.substr(begin, end - begin)));
141 if (end == std::string_view::npos) {
154 std::string proc_name,
155 std::string output_path,
156 std::shared_ptr<daq::ObservableEventLog> event_log)
158 , m_executor(m_io_ctx)
161 , m_proc_name(std::move(proc_name))
162 , m_output_path(std::move(output_path))
163 , m_event_log(std::move(event_log))
164 , m_log_observer_connection()
165 , m_log_observer(log4cplus::Logger::getInstance(
"daq.eventlog"))
166 , m_logger(log4cplus::Logger::getInstance(
LOGGER_NAME)) {
167 m_log_observer_connection =
168 m_event_log->ConnectObserver(std::reference_wrapper(m_log_observer));
169 if (m_proc_name.empty()) {
170 throw boost::enable_current_exception(
171 std::invalid_argument(
"OcmDaqService: Process name cannot be empty"));
176 m_log_observer_connection.disconnect();
179 boost::future<std::shared_ptr<::daqif::DaqReply>>
181 const std::string& file_prefix,
182 const std::string& primary_sources,
183 const std::string& metadata_sources,
184 const std::string& json_properties) {
187 [=,
self = shared_from_this()]()
mutable {
190 fmt::format(
"Request received: "
191 "StartDaq(id='{0}', file_prefix='{1}', "
192 "primary_sources='{2}', metadata_sources='{3}', "
193 "json_properties='{4}'",
200 std::filesystem::path prefix(file_prefix);
201 if (prefix.has_parent_path()) {
202 return boost::make_exceptional_future<std::shared_ptr<::daqif::DaqReply>>(
205 fmt::format(
"file_prefix \"{}\" may not contain parent paths",
213 }
catch (std::exception
const& e) {
215 fmt::format(
"Failed to parse StartDaq JSON properties: {}", e.what());
216 self->m_event_log->AddEvent(
daq::ErrorEvent(
id, msg, std::nullopt,
"user"));
217 return boost::make_exceptional_future<std::shared_ptr<::daqif::DaqReply>>(
218 daqif::DaqException(
id, msg));
222 auto validated_id = id;
223 if (validated_id.empty()) {
225 validated_id = context.
file_id;
228 fmt::format(
"StartDaq(id='{0}'): Created and assigned DAQ id='{0}'",
232 if (self->m_mgr.HaveDaq(validated_id)) {
235 fmt::format(
"StartDaq(id='{0}'): DAQ with id='{0}' already exist",
237 return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
238 daqif::DaqException(
id,
239 "Data acquisition with same id already exist"));
244 context.
id = validated_id;
253 return self->m_mgr.StartDaqAsync(context).then(
255 [weak_self = std::weak_ptr(self->shared_from_this()),
id = validated_id](
256 boost::future<daq::State> f) -> std::shared_ptr<daqif::DaqReply> {
257 auto self = weak_self.lock();
259 LOG4CPLUS_WARN(LOGGER_NAME,
260 fmt::format(
"StartDaq(id='{}'): StartDaqAsync is "
261 "complete but MAL service has "
262 "been abandoned. Throwing exception.",
264 throw boost::enable_current_exception(
265 daqif::DaqException(id,
"Service has been abandoned"));
269 auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
272 rep->setError(false);
277 LOG4CPLUS_ERROR(self->m_logger,
278 fmt::format(
"StartDaq(id='{}'): StartDaqAsync "
279 "completed with failure: {}",
283 throw boost::enable_current_exception(daqif::DaqException(
284 id, fmt::format(
"Start failed: {}", what)));
287 }
catch (std::invalid_argument
const& e) {
288 LOG4CPLUS_INFO(self->m_logger,
289 fmt::format(
"StartDaq(id='{}'): Invalid argument error while "
290 "processing request: {}",
293 return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
294 daqif::DaqException(validated_id, e.what()));
295 }
catch (std::exception
const& e) {
296 LOG4CPLUS_INFO(self->m_logger,
297 fmt::format(
"StartDaq(id='{}'): Error while"
298 "processing request: {}",
301 return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
302 daqif::DaqException(validated_id, e.what()));
306 fmt::format(
"StartDaq(id='{}'): Unknown error while processing request",
308 return boost::make_exceptional_future<std::shared_ptr<daqif::DaqReply>>(
309 daqif::DaqException(validated_id,
"Uknown error"));
319 [
self = shared_from_this(),
id]() {
321 fmt::format(
"Request received: "
325 return self->StopDaq(
id,
false);
330 boost::future<std::shared_ptr<::daqif::DaqReply>>
332 return boost::async(m_executor,
333 [
self = shared_from_this(),
id]() {
334 self->m_event_log->AddEvent(
336 fmt::format(
"Request received: "
337 "ForceStopDaq(id='{0}')",
340 return self->StopDaq(
id,
true);
345 boost::future<std::shared_ptr<::daqif::DaqReply>>
349 [
self = shared_from_this(),
id, forced]() {
355 [weak_self = std::weak_ptr(self->shared_from_this()),
356 id](boost::future<daq::Status> f) -> std::shared_ptr<daqif::DaqReply> {
357 auto self = weak_self.lock();
359 LOG4CPLUS_WARN(LOGGER_NAME,
360 fmt::format(
"StopDaq(id='{}'): StopDaqAsync is "
361 "complete but MAL service has "
362 "been abandoned. Throwing exception.",
364 throw boost::enable_current_exception(
365 daqif::DaqException(id,
"Service has been abandoned"));
369 auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
372 rep->setError(false);
377 LOG4CPLUS_INFO(self->m_logger,
378 fmt::format(
"StopDaq(id='{}'): "
379 "completed with failure: {}",
382 throw boost::enable_current_exception(daqif::DaqException(
383 id, fmt::format(
"Stop failed: {}", what)));
392 return boost::async(m_executor,
393 [
self = shared_from_this(),
id]() {
394 self->m_event_log->AddEvent(
396 fmt::format(
"Request received: "
397 "AbortDaq(id='{0}')",
400 return self->AbortDaq(
id,
false);
405 boost::future<std::shared_ptr<::daqif::DaqReply>>
407 return boost::async(m_executor,
408 [
self = shared_from_this(),
id]() {
409 self->m_event_log->AddEvent(
411 fmt::format(
"Request received: "
412 "ForceAbortDaq(id='{0}')",
415 return self->AbortDaq(
id,
true);
420 boost::future<std::shared_ptr<::daqif::DaqReply>>
424 [
self = shared_from_this(),
id, forced]() {
430 [weak_self = std::weak_ptr(self->shared_from_this()),
id, forced](
431 boost::future<daq::Status> f) -> std::shared_ptr<daqif::DaqReply> {
432 auto self = weak_self.lock();
436 fmt::format(
"AbortDaq(id='{}', forced={}): AbortDaqAsync is "
437 "complete but MAL service has "
438 "been abandoned. Throwing exception.",
441 throw boost::enable_current_exception(
442 daqif::DaqException(id,
"Service has been abandoned"));
445 auto result = f.get();
448 fmt::format(
"AbortDaq(id='{}', forced={}): "
449 "AbortDaqAsync Completed successfully",
452 auto rep = self->m_mal.createDataEntity<daqif::DaqReply>();
455 rep->setError(result.error);
458 fmt::format(
"AbortDaq(id='{}', forced={}): "
459 "AbortDaqAsync Completed, returning reply now.",
466 LOG4CPLUS_ERROR(self->m_logger,
467 fmt::format(
"AbortDaq(id='{}', forced={}): "
468 "AbortDaqAsync Completed "
469 "with fatal error:\n{}",
473 throw boost::enable_current_exception(
474 daqif::DaqException(
id, what));
482 boost::future<std::shared_ptr<::daqif::DaqReply>>
486 [
self = shared_from_this(),
id, keywords]() -> std::shared_ptr<::daqif::DaqReply> {
487 self->m_event_log->AddEvent(
489 fmt::format(
"Request received: "
490 "UpdateKeywords(id='{0}', keywords='{1}')",
497 }
catch (nlohmann::json::exception
const& e) {
500 fmt::format(
"UpdateKeywords(id='{}', ...): Failed to parse JSON",
id));
501 throw boost::enable_current_exception(
502 daqif::DaqException(
id, fmt::format(
"Invalid JSON string: {}", e.what())));
503 }
catch (std::invalid_argument
const& e) {
507 "UpdateKeywords(id='{}', ...): JSON could be parsed but was invalid "
510 throw boost::enable_current_exception(
511 daqif::DaqException(
id, fmt::format(
"Invalid JSON schema: {}", e.what())));
512 }
catch (std::exception
const& e) {
516 "UpdateKeywords(id='{}', ...): std::exception: '{}'",
id, e.what()));
517 throw boost::enable_current_exception(
518 daqif::DaqException(
id, fmt::format(
"std::exception: {}", e.what())));
520 throw boost::enable_current_exception(daqif::DaqException(
id,
"unknown error"));
523 self->m_mgr.UpdateKeywords(
id, parsed_keywords);
524 auto rep =
self->m_mal.createDataEntity<daqif::DaqReply>();
526 rep->setError(
false);
528 }
catch (std::invalid_argument
const& e) {
531 fmt::format(
"UpdateKeywords(id='{}'): Invalid data acquisition id",
id));
532 throw boost::enable_current_exception(
533 daqif::DaqException(
id, fmt::format(
"No data acquisition with id='{}'",
id)));
534 }
catch (std::exception
const& e) {
538 "UpdateKeywords(id='{}', ...): std::exception: '{}'",
id, e.what()));
539 throw boost::enable_current_exception(
540 daqif::DaqException(
id, fmt::format(
"std::exception: {}", e.what())));
542 throw boost::enable_current_exception(daqif::DaqException(
id,
"unknown error"));
550 [
self = shared_from_this(),
id]() {
551 self->m_event_log->AddEvent(
553 fmt::format(
"Request received: "
554 "GetStatus(id='{0}')",
558 LOG4CPLUS_INFO(self->m_logger, fmt::format(
"GetStatus(id='{}'): Enter",
id));
559 auto status =
self->m_mgr.GetStatus(
id);
560 auto rep =
self->m_mal.createDataEntity<daqif::DaqStatus>();
565 fmt::format(
"GetStatus(id='{}'): Set result -> {}",
id, status.state));
566 return boost::make_ready_future<std::shared_ptr<daqif::DaqStatus>>(rep);
567 }
catch (std::invalid_argument
const&) {
570 fmt::format(
"GetStatus(id='{}'): Invalid data acquisition id",
id));
571 return boost::make_exceptional_future<std::shared_ptr<daqif::DaqStatus>>(
572 boost::enable_current_exception(daqif::DaqException(
573 id, fmt::format(
"No data acquisition with id='{}'",
id))));
582 [
self = shared_from_this()]() -> std::vector<std::shared_ptr<::daqif::DaqStatus>> {
587 auto daqs =
self->m_mgr.GetDaqControllers();
588 std::vector<std::shared_ptr<daq::DaqController const>> active;
589 std::vector<std::shared_ptr<daqif::DaqStatus>> reply;
590 std::copy_if(daqs.begin(), daqs.end(), std::back_inserter(active), [](
auto daq_ctl) {
591 return !daq::IsFinalState(daq_ctl->GetState());
593 std::transform(active.begin(),
595 std::back_inserter(reply),
596 [&
mal = self->m_mal](
auto daq_ctl) {
597 auto mal_status = mal.createDataEntity<daqif::DaqStatus>();
598 *mal_status << daq_ctl->GetStatus()->GetStatus();
606 const std::string&
id, daqif::DaqState state, daqif::DaqSubState substate,
double timeout) {
607 using Seconds = std::chrono::duration<double>;
611 [=,
self = shared_from_this()]() {
613 format(
"Request received: "
614 "AwaitDaqState(id='{}', "
615 "state={}, substate={}, "
623 return boost::make_exceptional_future<daq::Result<daq::Status>>(
624 std::invalid_argument(
625 format(
"Invalid argument `timeout`. Must be > 0", timeout)));
628 return boost::make_exceptional_future<daq::Result<daq::Status>>(
629 std::invalid_argument(fmt::format(
630 "Invalid state combination: {} and {}", state, substate)));
633 return self->m_mgr.AwaitDaqStateAsync(
634 id, daq_state, duration_cast<milliseconds>(Seconds(timeout)));
638 [
id,
self = shared_from_this()](boost::future<daq::Result<daq::Status>> fut) {
640 auto [timeout, status] = fut.get();
641 auto mal_reply =
self->m_mal.createDataEntity<daqif::AwaitDaqReply>();
643 mal_reply->setTimeout(timeout);
644 auto mal_status_ptr = mal_reply->getStatus();
645 assert(mal_status_ptr);
646 *mal_status_ptr << status;
649 fmt::format(
"Request completed: {}",
650 (timeout ?
"condition not yet satisfied (timeout)"
651 :
"condition satisfied")),
654 }
catch (std::exception
const& e) {
657 fmt::format(
"Request completed exceptionally: {}", e.what()),
659 throw boost::enable_current_exception(daqif::DaqException(
id, e.what()));
662 id,
"Request completed exceptionally: Unknown exception", std::nullopt);
663 throw boost::enable_current_exception(
664 daqif::DaqException(
id,
"Uknown exception"));