8 #ifndef OCF_DAQ_OP_UTIL_HPP_
9 #define OCF_DAQ_OP_UTIL_HPP_
10 #include "../config.hpp"
16 #include <Metadaqif.hpp>
17 #include <boost/thread/future.hpp>
18 #include <fmt/format.h>
19 #include <fmt/ostream.h>
20 #include <log4cplus/logger.h>
21 #include <log4cplus/loggingmacros.h>
23 #include "../error.hpp"
24 #include "../state.hpp"
29 template <
class SourceType>
30 void Reply(
AsyncOpParams& params, SourceType
const& source, std::string description,
bool error) {
33 fmt::format(!error ?
"{}: Got reply from '{}'" :
"{}: Got error or timeout from '{}'",
38 template <
class SourceType>
42 fmt::format(
"{}: Sending request to'{}'", description, source),
48 return std::count_if(futures.begin(), futures.end(), [](F
const& future) noexcept ->
bool {
49 return future.has_exception();
81 template <
class R,
class Iterator,
class Pred,
class Sender,
class ReplyHandler>
82 boost::future<std::vector<boost::future<R>>>
88 ReplyHandler reply_handler,
89 std::string_view logging_description);
93 std::vector<std::exception_ptr> exceptions;
94 for (
auto& f : futures) {
98 exceptions.push_back(std::current_exception());
111 void UnwrapVoidReplies(boost::future<std::vector<boost::future<void>>>&& futures);
119 std::vector<R>
UnwrapReplies(boost::future<std::vector<boost::future<R>>>&& futures) {
139 template <
class ReplyType>
142 std::optional<State> error_state,
143 AsyncOpParams params,
145 boost::future<ReplyType>&& fut);
162 template<
class ReplyType>
165 std::optional<State> error_state,
166 AsyncOpParams params,
168 boost::future<ReplyType>&& fut);
171 template <
class R,
class Iterator,
class Pred,
class Sender,
class ReplyHandler>
172 boost::future<std::vector<boost::future<R>>>
178 ReplyHandler reply_handler,
179 std::string_view logging_description) {
180 LOG4CPLUS_INFO(params.
logger,
181 fmt::format(
"{}: {}: Sending requests.", params.
status, logging_description));
182 using Sourceype =
typename Iterator::value_type;
184 std::vector<boost::future<R>> replies;
189 auto send_func = [&, sender = std::move(sender), descr = std::string(logging_description)](
190 Sourceype& source) -> boost::future<R> {
198 std::string(source.GetSource().GetName()),
199 std::string(logging_description)),
200 reply_handler = std::move(reply_handler)](
auto fut)
mutable -> R {
203 reply_token.Release();
204 return std::invoke(reply_handler, params, source, std::move(fut));
211 std::invoke(std::move(sender), source).then(params.
executor, std::move(cont));
213 }
catch (std::exception
const& e) {
215 source.SetErrorFlag();
220 LOG4CPLUS_ERROR(params.
logger,
221 fmt::format(
"{}: Starting:"
222 "error Sending startDaq to metadata source {}: {}",
227 return boost::make_exceptional_future<R>();
232 for (
auto it = begin; it != end; ++it) {
236 if (!filter_pred(*it)) {
240 replies.emplace_back(send_func(*it));
246 "{}: {}: Awaiting {} replies.", params.
status, logging_description, replies.size()));
250 return boost::when_all(replies.begin(), replies.end())
253 [params, descr = std::string(logging_description)](
254 boost::future<std::vector<boost::future<R>>>&& replies_fut)
255 -> std::vector<boost::future<R>> {
256 auto replies = replies_fut.get();
259 fmt::format(
"{}: {}: All replies received or timed out (num errors {}/{})",
260 params.status, descr, eventlog::CountExceptions(replies),
267 #endif // #ifndef OCF_DAQ_OP_UTIL_HPP_