10 #include <Metadaqif.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <log4cplus/loggingmacros.h>
21 : m_policy(policy), m_params(m_params) {
29 return boost::when_all(AbortMeta(), AbortPrim())
31 [
this](future<std::tuple<future<void>, future<void>>> futs) ->
ResultType {
34 fmt::format(
"{}: AbortAsync: AbortMeta() and AbortPrim() completed.",
37 auto tup = futs.get();
38 auto& prim = std::get<0>(tup);
39 auto& meta = std::get<1>(tup);
41 if (prim.has_value() && meta.has_value()) {
42 LOG4CPLUS_INFO(m_params.logger,
43 fmt::format(
"{}: AbortAsync: DAQ aborted successfully.",
51 fmt::format(
"{}: AbortAsync: Data Acquisition abort has errors that will "
52 "be ignored due to error policy.",
57 LOG4CPLUS_ERROR(m_params.
logger,
58 fmt::format(
"{}: AbortAsync: Data Acquisition abort failed.",
62 std::vector<std::variant<DaqSourceError, std::exception_ptr>> errors;
66 errors.emplace_back(e);
68 errors.emplace_back(std::current_exception());
73 errors.emplace_back(e);
75 errors.emplace_back(std::current_exception());
77 LOG4CPLUS_DEBUG(m_params.
logger,
78 fmt::format(
"{}: Throwing exception.", m_params.
status));
79 throw boost::enable_current_exception(
DaqSourceErrors(std::move(errors)));
84 [[nodiscard]] boost::future<void> AbortAsync::AbortMeta() {
85 return SendRequestAndCollectReplies<void>(
86 m_params.meta_sources.begin(),
87 m_params.meta_sources.end(),
89 return s.GetState() != State::Aborted && s.GetState() != State::Stopped;
94 s.SetState(State::AbortingAcquiring);
95 return s.GetSource().GetRrClient().AbortDaq(id);
98 [](AsyncOpParams m_params,
100 boost::future<std::shared_ptr<metadaqif::DaqReply>>&& fut) ->
void {
101 HandleMetaDaqReply(
"AbortDaq",
102 State::AbortingAcquiring,
109 std::string_view(
"AbortAsync: abort metadata acquisition"))
113 [[nodiscard]] boost::future<void> AbortAsync::AbortPrim() {
114 return SendRequestAndCollectReplies<void>(
115 m_params.prim_sources.begin(),
116 m_params.prim_sources.end(),
117 [&](Source<PrimSource>& s) { return !IsFinalState(s.GetState()); },
120 [](Source<PrimSource>& s) {
121 s.SetState(State::AbortingAcquiring);
123 return s.GetSource().GetRrClient().RecAbort();
126 [](AsyncOpParams m_params,
127 Source<PrimSource>& source,
128 boost::future<std::string>&& fut) ->
void {
129 HandlePrimDaqReply(
"RecAbort",
130 State::AbortingAcquiring,
137 std::string_view(
"AbortAsync: abort primary data acquisition"))
Contains declaration for the AbortAsync operation.
Represents error in single source.
Exception thrown to carry reply errors.
Declares daq::State and related functions.
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void >>> futures)
Unwrap futures to extract errors.
ErrorPolicy
Error policy supported by certain operations.
@ Tolerant
Errors that can be ignored with partial completion of a command will be tolerated and is reported as ...
Simple class that holds the source and associated state.
AbortAsync(ErrorPolicy policy, AsyncOpParams params) noexcept
boost::future< ResultType > Initiate()
Initiates operation that stats metadata acquisition.
Parameters required for each async operation.
rad::IoExecutor & executor
ObservableStatus & status
Async operations should not modify state directly DaqController does that.
log4cplus::Logger & logger
Contains declaration for the async op utilities.