8 #ifndef OCF_DAQ_OP_UTIL_HPP_
9 #define OCF_DAQ_OP_UTIL_HPP_
10 #include "../config.hpp"
16 #include <Metadaqif.hpp>
17 #include <boost/thread/future.hpp>
18 #include <fmt/format.h>
19 #include <fmt/ostream.h>
20 #include <log4cplus/logger.h>
21 #include <log4cplus/loggingmacros.h>
23 #include "../error.hpp"
24 #include "../state.hpp"
29 template <
class SourceType>
33 fmt::format(!
error ?
"{}: Got reply from '{}'" :
"{}: Got error or timeout from '{}'",
38 template <
class SourceType>
42 fmt::format(
"{}: Sending request to'{}'", description, source),
48 return std::count_if(futures.begin(), futures.end(), [](F
const& future) noexcept ->
bool {
49 return future.has_exception();
81 template <
class R,
class Iterator,
class Pred,
class Sender,
class ReplyHandler>
82 boost::future<std::vector<boost::future<R>>>
88 ReplyHandler reply_handler,
89 std::string_view logging_description);
93 std::vector<std::exception_ptr> exceptions;
94 for (
auto& f : futures) {
97 }
catch (std::exception
const& e) {
98 LOG4CPLUS_DEBUG(
"TEST",
"E:" << e.what());
99 exceptions.push_back(std::current_exception());
120 std::vector<R>
UnwrapReplies(boost::future<std::vector<boost::future<R>>>&& futures) {
140 template <
class ReplyType>
142 std::optional<State> expected_state,
144 std::optional<State> error_state,
145 AsyncOpParams params,
147 boost::future<ReplyType>&& fut);
164 template <
class ReplyType>
166 std::optional<State> expected_state,
168 std::optional<State> error_state,
169 AsyncOpParams params,
171 boost::future<ReplyType>&& fut);
174 template <
class R,
class Iterator,
class Pred,
class Sender,
class ReplyHandler>
175 boost::future<std::vector<boost::future<R>>>
181 ReplyHandler reply_handler,
182 std::string_view logging_description) {
183 LOG4CPLUS_INFO(params.
logger,
184 fmt::format(
"{}: {}: Sending requests.", params.
status, logging_description));
185 using Sourceype =
typename Iterator::value_type;
187 std::vector<boost::future<R>> replies;
192 auto send_func = [&, sender = std::move(sender), descr = std::string(logging_description)](
193 Sourceype& source) -> boost::future<R> {
202 std::string(source.GetSource().GetName()), std::string(logging_description)),
203 reply_handler = std::move(reply_handler)](
auto fut)
mutable -> R {
206 reply_token.Release();
207 return std::invoke(reply_handler, params, source, std::move(fut));
214 std::invoke(std::move(sender), source).then(params.
executor, std::move(cont));
216 }
catch (std::exception
const& e) {
218 source.SetErrorFlag();
223 LOG4CPLUS_ERROR(params.
logger,
224 fmt::format(
"{}: Starting:"
225 "error Sending startDaq to metadata source {}: {}",
230 return boost::make_exceptional_future<R>();
235 for (
auto it = begin; it != end; ++it) {
239 if (!filter_pred(*it)) {
243 replies.emplace_back(send_func(*it));
249 "{}: {}: Awaiting {} replies.", params.
status, logging_description, replies.size()));
253 return boost::when_all(replies.begin(), replies.end())
255 [params, descr = std::string(logging_description)](
256 boost::future<std::vector<boost::future<R>>>&& replies_fut)
257 -> std::vector<boost::future<R>> {
258 auto replies = replies_fut.get();
261 fmt::format(
"{}: {}: All replies received or timed out (num errors {}/{})",
264 eventlog::CountExceptions(replies),
271 #endif // #ifndef OCF_DAQ_OP_UTIL_HPP_