10 #include <Metadaqif.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <log4cplus/loggingmacros.h>
24 : m_policy(policy), m_params(m_params), m_error(
false) {
29 using boost::make_exceptional_future;
34 [
this](future<void> prim_result) -> future<void> {
36 if (prim_result.has_exception()) {
38 if (m_policy == ErrorPolicy::Strict) {
39 LOG4CPLUS_INFO(m_params.logger,
40 fmt::format(
"{}: StopAsync: primary daq "
41 "failed. Will not stop metadata acquisition.",
46 return make_exceptional_future<void>(prim_result.get_exception_ptr());
48 LOG4CPLUS_INFO(m_params.logger,
49 fmt::format(
"{}: StopAsync: primary daq "
50 "failed. Ignoring this because of "
51 "ErrorPolicy::Tolerant.",
58 if (res.has_exception()) {
60 if (m_policy == ErrorPolicy::Strict) {
61 LOG4CPLUS_INFO(m_params.logger,
62 fmt::format(
"{}: StopAsync: stopping failed", m_params.status));
65 LOG4CPLUS_INFO(m_params.logger,
66 fmt::format(
"{}: StopAsync: meta daq "
67 "failed. Ignoring this because of "
68 "ErrorPolicy::Tolerant.",
71 return {m_error, std::move(m_parts)};
75 boost::future<void> StopAsync::StopMeta() {
76 return SendRequestAndCollectReplies<void>(
77 m_params.meta_sources.begin(),
78 m_params.meta_sources.end(),
79 [](
auto&) { return true; },
82 [
id = m_params.id](Source<MetaSource>& s) {
83 s.SetState(State::Stopping);
84 return s.GetSource().GetRrClient().StopDaq(id);
87 [
this](AsyncOpParams params,
88 Source<MetaSource>& source,
89 boost::future<std::shared_ptr<metadaqif::DaqStopReply>>&& fut) ->
void {
90 if (source.GetState() == State::Stopped) {
91 LOG4CPLUS_INFO(params.logger,
92 fmt::format(
"{}: StopMeta: Source already stopped, ignoring "
103 if (!reply.has_value()) {
107 for (
auto const& file : (**reply).getFiles()) {
108 m_parts.emplace_back(std::string(source.GetSource().GetName()),
111 std::string keywords = (**reply).getKeywords();
112 if (!keywords.empty()) {
114 auto keyword_vec = fits::ParseJsonKeywords(keywords.c_str());
115 m_parts.emplace_back(std::string(source.GetSource().GetName()),
116 std::move(keyword_vec));
119 std::string_view(
"StopAsync: stop metadata acquisition"))
123 boost::future<void> StopAsync::StopPrim() {
124 return SendRequestAndCollectReplies<void>(
125 m_params.prim_sources.begin(),
126 m_params.prim_sources.end(),
127 [](Source<PrimSource>
const& source) ->
bool {
129 return !IsFinalState(source.GetState());
133 [](Source<PrimSource>& s) {
135 s.SetState(State::Stopping);
136 return s.GetSource().GetRrClient().RecStop();
139 [
this](AsyncOpParams params,
140 Source<PrimSource>& source,
141 boost::future<std::shared_ptr<recif::RecStatus>>&& fut) ->
void {
142 auto reply = HandlePrimDaqReply(State::Stopping,
148 if (!reply.has_value()) {
152 for (
auto const& file : (**reply).getDpFiles()) {
153 m_parts.emplace_back(std::string(source.GetSource().GetName()),
157 std::string_view(
"StopAsync: stop primary data acquisition"))