11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <log4cplus/loggingmacros.h>
14 #include <nlohmann/json.hpp>
21 using boost::enable_current_exception;
22 using boost::make_exceptional_future;
24 using std::chrono::duration_cast;
25 using std::chrono::milliseconds;
48 std::vector<daq::MetaSource> ParseMetaSource(mal::Mal& mal, std::string
const& str) {
49 std::vector<daq::MetaSource> sources;
52 sources.reserve(raw_sources.size());
53 for (
auto const& raw : raw_sources) {
61 std::vector<daq::PrimSource> ParsePrimSource(mal::Mal& mal, std::string
const& str) {
62 std::vector<daq::PrimSource> sources;
65 sources.reserve(raw_sources.size());
66 for (
auto const& raw : raw_sources) {
77 using std::chrono::milliseconds;
78 using std::chrono::duration_cast;
79 using Seconds = std::chrono::duration<double>;
82 if (json_properties.empty()) {
87 auto json = nlohmann::json::parse(json_properties);
88 if (!json.is_object()) {
89 throw std::invalid_argument(
90 fmt::format(
"expectd type object but got type {}", json.type_name()));
92 if (json.contains(
"keywords")) {
95 if (json.contains(
"awaitInterval")) {
96 auto& value = json[
"awaitInterval"];
97 if (!value.is_number()) {
98 throw std::invalid_argument(fmt::format(
"'awaitInterval': unsupported type: {}",
101 auto await_interval = value.get<
double>();
102 if (await_interval < 0.0) {
103 throw std::invalid_argument(
104 fmt::format(
"'awaitInterval' must be positive number, got {}", await_interval));
106 properties.
await_interval = duration_cast<milliseconds>(Seconds(value.get<
double>()));
116 : name(std::move(name)), rr_uri(std::move(rr_uri)) {
119 os <<
"name: '" << s.
name <<
"', rr_uri='" << s.
rr_uri <<
"'";
124 auto start = s.find_first_not_of(
' ');
125 auto name_end_pos = s.find_first_of(
'@');
127 if (name_end_pos == std::string_view::npos) {
128 throw std::invalid_argument(
129 "separator '@' not found in expected format 'name@rr-uri'");
131 auto name = s.substr(start, name_end_pos - start);
133 throw std::invalid_argument(
"name part in 'name@rr-uri' is empty");
136 start = name_end_pos + 1;
137 if (start >= s.size()) {
138 throw std::invalid_argument(
"invalid format string, expected format 'name@rr-uri'");
140 auto rr_uri_end_pos = s.find_first_of(
" ,", start);
141 if (name_end_pos == std::string_view::npos) {
142 throw std::invalid_argument(
"separator ',' not found");
145 auto rr_uri = s.substr(start, rr_uri_end_pos - start);
146 if (rr_uri.empty()) {
147 throw std::invalid_argument(
"rr_uri part in 'name@rr-uri' is empty");
149 return ParsedSource(std::string(name), std::string(rr_uri));
158 std::vector<ParsedSource> result;
161 while (begin < s.size()) {
162 const auto end = s.find_first_of(
' ', begin);
165 result.emplace_back(
ParseSourceUri(s.substr(begin, end - begin)));
168 if (end == std::string_view::npos) {
180 std::string output_path)
182 , m_executor(m_io_ctx)
185 , m_output_path(std::move(output_path))
186 , m_event_log(std::make_shared<
daq::ObservableEventLog>())
187 , m_log_observer_connection()
188 , m_log_observer(log4cplus::Logger::getInstance(
"daq.eventlog"))
189 , m_logger(log4cplus::Logger::getInstance(
LOGGER_NAME)) {
190 m_log_observer_connection =
191 m_event_log->ConnectObserver(std::reference_wrapper(m_log_observer));
195 m_log_observer_connection.disconnect();
198 boost::future<std::shared_ptr<::ocmif::DaqReply>>
200 const std::string& file_prefix,
201 const std::string& primary_sources,
202 const std::string& metadata_sources,
203 const std::string& json_properties) {
206 [=,
self = shared_from_this()]()
mutable {
209 fmt::format(
"Request received: "
210 "StartDaq(id='{0}', file_prefix='{1}', "
211 "primary_sources='{2}', metadata_sources='{3}', "
212 "json_properties='{4}'",
224 }
catch (std::exception
const& e) {
226 fmt::format(
"Failed to parse StartDaq JSON properties: {}", e.what());
227 self->m_event_log->AddEvent(
daq::ErrorEvent(
id, msg, std::nullopt,
"user"));
228 return boost::make_exceptional_future<
229 std::shared_ptr<::ocmif::DaqReply>>(
230 ocmif::DaqException(
id, msg));
233 auto validated_id = id;
234 if (validated_id.empty()) {
235 validated_id =
self->m_mgr.MakeDaqId();
238 fmt::format(
"StartDaq(id='{0}'): Created and assigned DAQ id='{0}'",
242 if (self->m_mgr.HaveDaq(
id)) {
245 fmt::format(
"StartDaq(id='{0}'): DAQ with id='{0}' already exist",
247 return boost::make_exceptional_future<
248 std::shared_ptr<ocmif::DaqReply>>(ocmif::DaqException(
249 id,
"Data acquisition with same id already exist"));
254 properties.
id = validated_id;
257 properties.
prim_sources = ParsePrimSource(m_mal, primary_sources);
259 properties.
meta_sources = ParseMetaSource(m_mal, metadata_sources);
266 auto status = std::make_shared<daq::ObservableStatus>(validated_id);
271 self->m_io_ctx, std::move(properties), status, m_event_log, ops);
274 self->m_mgr.AddDaq(
daq);
277 return self->m_mgr.StartDaqAsync(validated_id)
280 [weak_self = std::weak_ptr(self->shared_from_this()),
281 id = validated_id](boost::future<daq::State> f)
282 -> std::shared_ptr<ocmif::DaqReply> {
283 auto self = weak_self.lock();
287 fmt::format(
"StartDaq(id='{}'): StartDaqAsync is "
288 "complete but MAL service has "
289 "been abandoned. Throwing exception.",
291 throw boost::enable_current_exception(
292 ocmif::DaqException(id,
293 "Service has been abandoned"));
295 if (f.has_exception()) {
296 LOG4CPLUS_INFO(self->m_logger,
297 fmt::format(
"StartDaq(id='{}'): StartDaqAsync "
298 "completed with failure",
300 throw boost::enable_current_exception(
301 ocmif::DaqException(id,
"Start failed"));
303 auto rep = self->m_mal.createDataEntity<ocmif::DaqReply>();
306 rep->setError(
false);
309 }
catch (std::invalid_argument
const& e) {
310 LOG4CPLUS_INFO(self->m_logger,
311 fmt::format(
"StartDaq(id='{}'): Invalid argument error while "
312 "processing request: {}",
315 return boost::make_exceptional_future<std::shared_ptr<ocmif::DaqReply>>(
316 ocmif::DaqException(validated_id, e.what()));
317 }
catch (std::exception
const& e) {
318 LOG4CPLUS_INFO(self->m_logger,
319 fmt::format(
"StartDaq(id='{}'): Error while"
320 "processing request: {}",
323 return boost::make_exceptional_future<std::shared_ptr<ocmif::DaqReply>>(
324 ocmif::DaqException(validated_id, e.what()));
328 fmt::format(
"StartDaq(id='{}'): Unknown error while processing request",
330 return boost::make_exceptional_future<std::shared_ptr<ocmif::DaqReply>>(
331 ocmif::DaqException(validated_id,
"Uknown error"));
338 boost::future<std::shared_ptr<::ocmif::DaqReply>>
342 [
self = shared_from_this(),
id]() {
344 fmt::format(
"Request received: "
348 return self->StopDaq(
id,
false);
352 boost::future<std::shared_ptr<::ocmif::DaqReply>>
356 [
self = shared_from_this(),
id]() {
358 fmt::format(
"Request received: "
359 "ForceStopDaq(id='{0}')",
362 return self->StopDaq(
id,
true);
366 boost::future<std::shared_ptr<::ocmif::DaqReply>>
370 [
self = shared_from_this(),
id, forced]() {
376 [weak_self = std::weak_ptr(self->shared_from_this()),
377 id](boost::future<daq::Status> f)
378 -> std::shared_ptr<ocmif::DaqReply> {
379 auto self = weak_self.lock();
381 LOG4CPLUS_WARN(LOGGER_NAME,
382 fmt::format(
"StopDaq(id='{}'): StopDaqAsync is "
383 "complete but MAL service has "
384 "been abandoned. Throwing exception.",
386 throw boost::enable_current_exception(
387 ocmif::DaqException(id,
"Service has been abandoned"));
389 if (f.has_exception()) {
390 throw boost::enable_current_exception(
391 ocmif::DaqException(id,
"Stop failed"));
393 auto rep = self->m_mal.createDataEntity<ocmif::DaqReply>();
396 rep->setError(
false);
404 boost::future<std::shared_ptr<::ocmif::DaqReply>>
408 [
self = shared_from_this(),
id]() {
410 fmt::format(
"Request received: "
411 "AbortDaq(id='{0}')",
414 return self->AbortDaq(
id,
false);
418 boost::future<std::shared_ptr<::ocmif::DaqReply>>
422 [
self = shared_from_this(),
id]() {
424 fmt::format(
"Request received: "
425 "ForceAbortDaq(id='{0}')",
428 return self->AbortDaq(
id,
true);
432 boost::future<std::shared_ptr<::ocmif::DaqReply>>
436 [
self = shared_from_this(),
id, forced]() {
442 [weak_self = std::weak_ptr(self->shared_from_this()),
id, forced](
443 boost::future<daq::Status> f)
444 -> std::shared_ptr<ocmif::DaqReply> {
445 auto self = weak_self.lock();
449 fmt::format(
"AbortDaq(id='{}', forced={}): AbortDaqAsync is "
450 "complete but MAL service has "
451 "been abandoned. Throwing exception.",
454 throw boost::enable_current_exception(
455 ocmif::DaqException(id,
"Service has been abandoned"));
457 if (f.has_exception()) {
458 LOG4CPLUS_INFO(self->m_logger,
459 fmt::format(
"AbortDaq(id='{}', forced={}): "
460 "AbortDaqAsync Completed "
464 throw boost::enable_current_exception(
465 ocmif::DaqException(id,
"Abort failed"));
467 auto result = f.get();
468 LOG4CPLUS_INFO(self->m_logger,
469 fmt::format(
"AbortDaq(id='{}', forced={}): "
470 "AbortDaqAsync Completed successfully",
473 auto rep = self->m_mal.createDataEntity<ocmif::DaqReply>();
476 rep->setError(result.error);
479 fmt::format(
"AbortDaq(id='{}', forced={}): "
480 "AbortDaqAsync Completed, returning reply now.",
490 boost::future<std::shared_ptr<::ocmif::DaqReply>>
494 [
self = shared_from_this(),
id, keywords]() -> std::shared_ptr<::ocmif::DaqReply> {
495 self->m_event_log->AddEvent(
497 fmt::format(
"Request received: "
498 "UpdateKeywords(id='{0}', keywords='{1}')",
505 }
catch (nlohmann::json::exception
const& e) {
508 fmt::format(
"UpdateKeywords(id='{}', ...): Failed to parse JSON",
id));
509 throw boost::enable_current_exception(ocmif::DaqException(
510 id, fmt::format(
"Invalid JSON string: {}", e.what())));
511 }
catch (std::invalid_argument
const& e) {
515 "UpdateKeywords(id='{}', ...): JSON could be parsed but was invalid "
518 throw boost::enable_current_exception(ocmif::DaqException(
519 id, fmt::format(
"Invalid JSON schema: {}", e.what())));
520 }
catch (std::exception
const& e) {
524 "UpdateKeywords(id='{}', ...): std::exception: '{}'",
526 throw boost::enable_current_exception(ocmif::DaqException(
527 id, fmt::format(
"std::exception: {}", e.what())));
529 throw boost::enable_current_exception(ocmif::DaqException(
id,
"unknown error"));
532 self->m_mgr.UpdateKeywords(
id, parsed_keywords);
533 auto rep =
self->m_mal.createDataEntity<ocmif::DaqReply>();
535 rep->setError(
false);
537 }
catch (std::invalid_argument
const& e) {
540 fmt::format(
"UpdateKeywords(id='{}'): Invalid data acquisition id",
id));
541 throw boost::enable_current_exception(ocmif::DaqException(
542 id, fmt::format(
"No data acquisition with id='{}'",
id)));
543 }
catch (std::exception
const& e) {
547 "UpdateKeywords(id='{}', ...): std::exception: '{}'",
549 throw boost::enable_current_exception(ocmif::DaqException(
550 id, fmt::format(
"std::exception: {}", e.what())));
552 throw boost::enable_current_exception(ocmif::DaqException(
id,
"unknown error"));
557 boost::future<std::shared_ptr<::ocmif::DaqStatus>>
561 [
self = shared_from_this(),
id]() {
562 self->m_event_log->AddEvent(
564 fmt::format(
"Request received: "
565 "GetStatus(id='{0}')",
569 LOG4CPLUS_INFO(self->m_logger, fmt::format(
"GetStatus(id='{}'): Enter",
id));
570 auto status =
self->m_mgr.GetStatus(
id);
571 auto rep =
self->m_mal.createDataEntity<ocmif::DaqStatus>();
574 LOG4CPLUS_INFO(self->m_logger,
575 fmt::format(
"GetStatus(id='{}'): Set result -> {}",
578 return boost::make_ready_future<std::shared_ptr<ocmif::DaqStatus>>(
580 }
catch (std::invalid_argument
const&) {
583 fmt::format(
"GetStatus(id='{}'): Invalid data acquisition id",
id));
584 return boost::make_exceptional_future<
585 std::shared_ptr<ocmif::DaqStatus>>(
586 boost::enable_current_exception(ocmif::DaqException(
587 id, fmt::format(
"No data acquisition with id='{}'",
id))));
596 [
self = shared_from_this()]() -> std::vector<std::shared_ptr<::ocmif::DaqStatus>> {
601 auto daqs =
self->m_mgr.GetDaqControllers();
602 std::vector<std::shared_ptr<daq::DaqController const>> active;
603 std::vector<std::shared_ptr<ocmif::DaqStatus>> reply;
604 std::copy_if(daqs.begin(),
606 std::back_inserter(active),
607 [](
auto daq_ctl) { return !daq::IsFinalState(daq_ctl->GetState()); });
611 std::back_inserter(reply),
612 [&mal = self->m_mal](
auto daq_ctl) {
613 auto mal_status = mal.createDataEntity<ocmif::DaqStatus>();
614 *mal_status << daq_ctl->GetStatus()->GetStatus();
622 const std::string&
id, ocmif::DaqState state, ocmif::DaqSubState substate,
double timeout) {
623 using Seconds = std::chrono::duration<double>;
625 return boost::async(m_executor, [=,
self = shared_from_this()]() {
627 format(
"Request received: "
628 "AwaitDaqState(id='{}', "
629 "state={}, substate={}, "
637 return boost::make_exceptional_future<daq::Result<daq::Status>>(
638 std::invalid_argument(format(
"Invalid argument `timeout`. Must be > 0", timeout)));
640 if (state != ocmif::StateAcquiring && substate != ocmif::Aborted) {
641 return boost::make_exceptional_future<daq::Result<daq::Status>>(
642 std::invalid_argument(
"Invalid argument `state`: "
643 "Only ocmif::StateAcquiring is supported"));
646 return self->m_mgr.AwaitDaqStateAsync(
id,
648 duration_cast<milliseconds>(Seconds(timeout)));
651 .then(m_executor, [
id,
self = shared_from_this()](boost::future<daq::Result<daq::Status>> fut) {
653 auto [timeout, status] = fut.get();
654 auto mal_reply =
self->m_mal.createDataEntity<ocmif::AwaitDaqReply>();
656 mal_reply->setTimeout(timeout);
657 auto mal_status_ptr = mal_reply->getStatus();
658 assert(mal_status_ptr);
659 *mal_status_ptr << status;
661 fmt::format(
"Request completed: {}",
662 (timeout ?
"condition not yet satisfied (timeout)"
663 :
"condition satisfied")),
666 }
catch (std::exception
const& e) {
667 self->m_event_log->AddEvent(
669 fmt::format(
"Request completed exceptionally: {}", e.what()),
671 throw boost::enable_current_exception(ocmif::DaqException(
id, e.what()));
674 "Request completed exceptionally: Unknown exception",
676 throw boost::enable_current_exception(ocmif::DaqException(
id,
"Uknown exception"));