10 #include <Metadaqif.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <log4cplus/loggingmacros.h>
22 : m_policy(policy), m_params(m_params), m_error(
false) {
27 using boost::make_exceptional_future;
32 [
this](future<void> prim_result) -> future<void> {
34 if (prim_result.has_exception()) {
36 if (m_policy == ErrorPolicy::Strict) {
37 LOG4CPLUS_INFO(m_params.logger,
38 fmt::format(
"{}: StopAsync: primary daq "
39 "failed. Will not stop metadata acquisition.",
44 return make_exceptional_future<void>(prim_result.get_exception_ptr());
46 LOG4CPLUS_INFO(m_params.logger,
47 fmt::format(
"{}: StopAsync: primary daq "
48 "failed. Ignoring this because of "
49 "ErrorPolicy::Tolerant.",
56 if (res.has_exception()) {
58 if (m_policy == ErrorPolicy::Strict) {
59 LOG4CPLUS_INFO(m_params.logger,
60 fmt::format(
"{}: StopAsync: stopping failed", m_params.status));
63 LOG4CPLUS_INFO(m_params.logger,
64 fmt::format(
"{}: StopAsync: meta daq "
65 "failed. Ignoring this because of "
66 "ErrorPolicy::Tolerant.",
69 return {m_error, std::move(m_parts)};
73 boost::future<void> StopAsync::StopMeta() {
74 return SendRequestAndCollectReplies<void>(
75 m_params.meta_sources.begin(),
76 m_params.meta_sources.end(),
77 [](
auto&) { return true; },
80 [
id = m_params.id](Source<MetaSource>& s) {
81 s.SetState(State::Stopping);
82 return s.GetSource().GetRrClient().StopDaq(id);
85 [
this](AsyncOpParams params,
86 Source<MetaSource>& source,
87 boost::future<std::shared_ptr<metadaqif::DaqStopReply>>&& fut) ->
void {
88 if (source.GetState() == State::Stopped) {
89 LOG4CPLUS_INFO(params.logger,
90 fmt::format(
"{}: StopMeta: Source already stopped, ignoring "
102 if (!reply.has_value()) {
106 for (
auto const& file : (**reply).getFiles()) {
107 m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
109 std::string keywords = (**reply).getKeywords();
110 if (!keywords.empty()) {
112 auto keyword_vec = fits::ParseJsonKeywords(keywords.c_str());
113 m_parts.emplace_back(std::string(source.GetSource().GetName()),
114 std::move(keyword_vec));
117 std::string_view(
"StopAsync: stop metadata acquisition"))
121 boost::future<void> StopAsync::StopPrim() {
122 return SendRequestAndCollectReplies<void>(
123 m_params.prim_sources.begin(),
124 m_params.prim_sources.end(),
125 [](Source<PrimSource>
const& source) ->
bool {
127 return IsSubsequentState(State::Stopped, source.GetState());
131 [](Source<PrimSource>& s) {
133 s.SetState(State::Stopping);
134 return s.GetSource().GetRrClient().RecStop();
137 [
this](AsyncOpParams params,
138 Source<PrimSource>& source,
139 boost::future<std::shared_ptr<recif::RecStatus>>&& fut) ->
void {
140 auto reply = HandlePrimDaqReply(
"RecStop",
147 if (!reply.has_value()) {
151 for (
auto const& file : (**reply).getDpFiles()) {
152 m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
155 std::string_view(
"StopAsync: stop primary data acquisition"))
Contains data structure for FITS keywords.
Declares daq::State and related functions.
std::optional< ReplyType > HandleMetaDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< MetaSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void >>> futures)
Unwrap futures to extract errors.
ErrorPolicy
Error policy supported by certain operations.
@ Stopping
Transitional state between Acquiring and Stopped.
@ Stopped
All data sources have reported they have stopped acquiring data.
Utility class that represents a result and an error.
Contains declaration for the StopAsync operation.
Parameters required for each async operation.
rad::IoExecutor & executor
boost::future< Result< DpParts > > Initiate()
Initiates operation that stop metadata acquisition.
StopAsync(ErrorPolicy policy, AsyncOpParams params) noexcept
Contains declaration for the async op utilities.