10 #include <fmt/format.h>
11 #include <fmt/ostream.h>
12 #include <log4cplus/loggingmacros.h>
26 bool AwaitUnwrapReplies(boost::future<std::vector<boost::future<bool>>>&& futures) {
27 std::vector<boost::future<bool>> values = futures.get();
32 std::vector<std::exception_ptr> exceptions;
34 size_t num_exceptions = 0;
35 for (
auto& f : values) {
40 }
catch (std::exception
const& e) {
44 fmt::format(
"daq::op::AwaitUnwrapReplies: Source replied with exception: {}",
50 fmt::format(
"daq::op::AwaitUnwrapReplies: {}/{} replies contained exceptions. {}/{} "
51 "replies report success",
57 return num_ok == values.size();
60 class RecWaitSpec :
public recif::RecWaitSpec {
62 RecWaitSpec(
float timeout) : m_timeout{timeout}, m_info{} {
64 std::string getInfo()
const override {
67 void setInfo(
const std::string& info)
override {
71 float getTimeout()
const override {
74 void setTimeout(
float timeout)
override {
77 bool hasKey()
const override {
80 std::unique_ptr<recif::RecWaitSpec> cloneKey()
const override {
81 throw std::runtime_error(
"not clonable");
83 std::unique_ptr<recif::RecWaitSpec> clone()
const override {
84 throw std::runtime_error(
"not clonable");
86 bool keyEquals(
const recif::RecWaitSpec& other)
const override {
99 : m_params(m_params), m_error(
false), m_parts(), m_aborted(
false) {
104 fmt::format(
"AwaitPrimAsync::Initiate: Operation initiating"));
106 return m_promise.get_future();
109 void AwaitPrimAsync::InitiateAwait() {
113 .then(m_params.
common.
executor, [
this](
auto) { return AwaitOnceAsync(); })
115 .then(m_params.
common.
executor, [
this](boost::future<bool> fut) ->
void {
118 LOG4CPLUS_DEBUG(m_params.common.logger,
119 fmt::format(
"AwaitPrimAsync::InitiateAwait: Operation aborted"));
123 auto is_ok = fut.get();
126 this->m_promise.set_value({m_error, std::move(m_parts)});
129 this->InitiateAwait();
133 this->m_promise.set_exception(boost::current_exception());
138 boost::future<void> AwaitPrimAsync::MakeInterval() {
139 using std::chrono::duration_cast;
140 using std::chrono::steady_clock;
141 using std::chrono::milliseconds;
145 return boost::make_ready_future();
148 auto now = steady_clock::now();
149 auto next_start = *m_last_start + duration_cast<steady_clock::duration>(m_params.wait_interval);
150 if (now >= next_start) {
152 return boost::make_ready_future();
154 LOG4CPLUS_DEBUG(m_params.common.logger,
155 fmt::format(
"AwaitPrimAsync::MakeInterval: Waiting {}ms until sending RecWait",
156 duration_cast<milliseconds>(next_start - now).count()));
159 m_interval.emplace(m_params.common.executor.get_io_context(), next_start);
160 m_interval->timer.async_wait([
this](boost::system::error_code
const& ec){
163 m_interval->promise.set_value();
165 return m_interval->promise.get_future();
168 void AwaitPrimAsync::Abort() noexcept {
169 LOG4CPLUS_INFO(m_params.common.logger,
170 fmt::format(
"AwaitPrimAsync::Abort: Requested to abort! "
171 "Number of files received so far: {}",
178 m_promise.set_value({m_error, std::move(m_parts)});
181 boost::future<bool> AwaitPrimAsync::AwaitOnceAsync() {
182 using Reply = std::shared_ptr<recif::RecWaitStatus>;
183 using Seconds = std::chrono::duration<float>;
184 using std::chrono::duration_cast;
186 LOG4CPLUS_DEBUG(m_params.common.logger,
187 fmt::format(
"AwaitPrimAsync::AwaitOnceAsync: Operation aborted"));
188 return boost::make_ready_future<bool>(
true);
190 LOG4CPLUS_DEBUG(m_params.common.logger,
191 fmt::format(
"AwaitPrimAsync::AwaitOnceAsync: Sending requests..."));
193 m_last_start = std::chrono::steady_clock::now();
195 return SendRequestAndCollectReplies<bool>(
196 m_params.common.prim_sources.begin(),
197 m_params.common.prim_sources.end(),
200 return !IsFinalState(source.GetState());
205 auto spec = std::make_shared<RecWaitSpec>(
206 duration_cast<Seconds>(this->m_params.wait_interval).count());
207 return s.GetSource().GetRrClient().RecWait(spec);
211 ->
bool { return HandleRecWaitReply(source, std::move(fut)); },
212 std::string_view(
"AwaitPrimAsync: await primary data acquisition"))
213 .then(m_params.common.executor, AwaitUnwrapReplies);
216 bool AwaitPrimAsync::HandleRecWaitReply(Source<PrimSource>& source,
217 boost::future<std::shared_ptr<recif::RecWaitStatus>>&& fut) {
219 auto reply = fut.get();
220 auto status = reply->getStatus();
221 if (status == recif::Success) {
223 source.SetState(State::Stopped);
225 auto rec_status = reply->getRecStatus();
226 LOG4CPLUS_INFO(m_params.common.logger,
227 fmt::format(
"Data source '{}' replied successfully and provides {} "
230 rec_status->getDpFiles().size()));
231 if (rec_status->getDpFiles().empty()) {
232 LOG4CPLUS_WARN(m_params.common.logger,
233 fmt::format(
"Data source '{}' replied successfully for "
234 "RecWait but did not produce any files!",
235 source.GetSource()));
237 for (
auto const& file : rec_status->getDpFiles()) {
238 m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
246 }
catch (recif::ExceptionErr
const& e) {
248 throw boost::enable_current_exception(
249 DaqSourceError(
"RecWait", std::string(source.GetSource().GetName()), e.what()));
252 throw boost::enable_current_exception(DaqSourceError(
253 "RecWait", std::string(source.GetSource().GetName()),
"unknown exception"));