8 #ifndef OCF_DAQ_OP_UTIL_HPP_
9 #define OCF_DAQ_OP_UTIL_HPP_
10 #include "../config.hpp"
16 #include <Metadaqif.hpp>
17 #include <boost/thread/future.hpp>
18 #include <fmt/format.h>
19 #include <fmt/ostream.h>
20 #include <log4cplus/logger.h>
21 #include <log4cplus/loggingmacros.h>
23 #include "../error.hpp"
24 #include "../state.hpp"
29 template <
class SourceType>
33 fmt::format(!
error ?
"{}: Got reply from '{}'" :
"{}: Got error or timeout from '{}'",
38 template <
class SourceType>
42 fmt::format(
"{}: Sending request to'{}'", description, source),
48 return std::count_if(futures.begin(), futures.end(), [](F
const& future) noexcept ->
bool {
49 return future.has_exception();
81 template <
class R,
class Iterator,
class Pred,
class Sender,
class ReplyHandler>
82 boost::future<std::vector<boost::future<R>>>
88 ReplyHandler reply_handler,
89 std::string_view logging_description);
93 std::vector<std::exception_ptr> exceptions;
94 for (
auto& f : futures) {
97 }
catch (std::exception
const& e) {
98 LOG4CPLUS_DEBUG(
"TEST",
"E:" << e.what());
99 exceptions.push_back(std::current_exception());
120 std::vector<R>
UnwrapReplies(boost::future<std::vector<boost::future<R>>>&& futures) {
140 template <
class ReplyType>
142 std::optional<State> expected_state,
144 std::optional<State> error_state,
145 AsyncOpParams params,
147 boost::future<ReplyType>&& fut);
164 template <
class ReplyType>
166 std::optional<State> expected_state,
168 std::optional<State> error_state,
169 AsyncOpParams params,
171 boost::future<ReplyType>&& fut);
174 template <
class R,
class Iterator,
class Pred,
class Sender,
class ReplyHandler>
175 boost::future<std::vector<boost::future<R>>>
181 ReplyHandler reply_handler,
182 std::string_view logging_description) {
183 LOG4CPLUS_INFO(params.
logger,
184 fmt::format(
"{}: {}: Sending requests.", params.
status, logging_description));
185 using Sourceype =
typename Iterator::value_type;
187 std::vector<boost::future<R>> replies;
192 auto send_func = [&, sender = std::move(sender), descr = std::string(logging_description)](
193 Sourceype& source) -> boost::future<R> {
202 std::string(source.GetSource().GetName()), std::string(logging_description)),
203 reply_handler = std::move(reply_handler)](
auto fut)
mutable -> R {
206 reply_token.Release();
207 return std::invoke(reply_handler, params, source, std::move(fut));
214 std::invoke(std::move(sender), source).then(params.
executor, std::move(cont));
216 }
catch (std::exception
const& e) {
218 source.SetErrorFlag();
223 LOG4CPLUS_ERROR(params.
logger,
224 fmt::format(
"{}: {}: "
225 "Error sending request to source '{}': {}",
230 return boost::make_exceptional_future<R>();
235 for (
auto it = begin; it != end; ++it) {
239 if (!filter_pred(*it)) {
243 replies.emplace_back(send_func(*it));
249 "{}: {}: Awaiting {} replies.", params.
status, logging_description, replies.size()));
253 return boost::when_all(replies.begin(), replies.end())
255 [params, descr = std::string(logging_description)](
256 boost::future<std::vector<boost::future<R>>>&& replies_fut)
257 -> std::vector<boost::future<R>> {
258 auto replies = replies_fut.get();
261 fmt::format(
"{}: {}: All replies received or timed out (num errors {}/{})",
264 eventlog::CountExceptions(replies),
void AddEvent(EventLog::EventType event)
Records that a file has been produced for this data acquisition.
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
ReplyToken Acquire(std::string source_id, std::string request)
Acquire token.
void Reply(AsyncOpParams ¶ms, SourceType const &source, std::string description, bool error)
size_t CountExceptions(std::vector< F > const &futures) noexcept
void Request(AsyncOpParams ¶ms, SourceType const &source, std::string description)
std::vector< std::exception_ptr > ExtractExceptions(std::vector< boost::future< T >> &futures)
std::optional< ReplyType > HandleMetaDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< MetaSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
std::optional< ReplyType > HandlePrimDaqReply(char const *request, std::optional< State > expected_state, State success_state, std::optional< State > error_state, AsyncOpParams params, Source< PrimSource > &source, boost::future< ReplyType > &&fut)
Reply handler that checks for exceptions in reply.
std::vector< R > UnwrapReplies(boost::future< std::vector< boost::future< R >>> &&futures)
Unwrap replies.
boost::future< std::vector< boost::future< R > > > SendRequestAndCollectReplies(Iterator begin, Iterator end, Pred filter_pred, AsyncOpParams params, Sender sender, ReplyHandler reply_handler, std::string_view logging_description)
Utility function to Send requests and collect replies.
void UnwrapVoidReplies(boost::future< std::vector< boost::future< void >>> futures)
Unwrap futures to extract errors.
State
Observable states of the data acquisition process.
Represents a generic event if a more specific event is not usable.
Simple class that holds the source and associated state.
Parameters required for each async operation.
ObservableEventLog & event_log
PendingReplies & pending_replies
rad::IoExecutor & executor
ObservableStatus & status
Async operations should not modify state directly DaqController does that.
log4cplus::Logger & logger