10 #include <fmt/format.h>
11 #include <fmt/ostream.h>
12 #include <log4cplus/loggingmacros.h>
27 bool AwaitUnwrapReplies(boost::future<std::vector<boost::future<bool>>>&& futures) {
28 std::vector<boost::future<bool>> values = futures.get();
33 std::vector<std::exception_ptr> exceptions;
35 size_t num_exceptions = 0;
36 for (
auto& f : values) {
41 }
catch (std::exception
const& e) {
45 fmt::format(
"daq::op::AwaitUnwrapReplies: Source replied with exception: {}",
51 fmt::format(
"daq::op::AwaitUnwrapReplies: {}/{} replies contained exceptions. {}/{} "
52 "replies report success",
58 return num_ok == values.size();
61 class RecWaitSpec :
public recif::RecWaitSpec {
63 RecWaitSpec(
float timeout) : m_timeout{timeout}, m_info{} {
65 std::string getInfo()
const override {
68 void setInfo(
const std::string& info)
override {
72 float getTimeout()
const override {
75 void setTimeout(
float timeout)
override {
78 bool hasKey()
const override {
81 std::unique_ptr<recif::RecWaitSpec> cloneKey()
const override {
82 throw std::runtime_error(
"not clonable");
84 std::unique_ptr<recif::RecWaitSpec> clone()
const override {
85 throw std::runtime_error(
"not clonable");
87 bool keyEquals(
const recif::RecWaitSpec& other)
const override {
100 : m_params(m_params), m_error(
false), m_parts(), m_aborted(
false) {
105 fmt::format(
"AwaitPrimAsync::Initiate: Operation initiating"));
107 return m_promise.get_future();
110 void AwaitPrimAsync::InitiateAwait() {
114 .then(m_params.
common.
executor, [
this](
auto) { return AwaitOnceAsync(); })
116 .then(m_params.
common.
executor, [
this](boost::future<bool> fut) ->
void {
119 LOG4CPLUS_DEBUG(m_params.common.logger,
120 fmt::format(
"AwaitPrimAsync::InitiateAwait: Operation aborted"));
124 auto is_ok = fut.get();
127 this->m_promise.set_value({m_error, std::move(m_parts)});
130 this->InitiateAwait();
134 this->m_promise.set_exception(boost::current_exception());
139 boost::future<void> AwaitPrimAsync::MakeInterval() {
140 using std::chrono::duration_cast;
141 using std::chrono::milliseconds;
142 using std::chrono::steady_clock;
146 return boost::make_ready_future();
149 auto now = steady_clock::now();
150 auto next_start = *m_last_start + duration_cast<steady_clock::duration>(m_params.wait_interval);
151 if (now >= next_start) {
153 return boost::make_ready_future();
155 LOG4CPLUS_DEBUG(m_params.common.logger,
156 fmt::format(
"AwaitPrimAsync::MakeInterval: Waiting {}ms until sending RecWait",
157 duration_cast<milliseconds>(next_start - now).count()));
160 m_interval.emplace(m_params.common.executor.get_io_context(), next_start);
161 m_interval->timer.async_wait([
this](boost::system::error_code
const& ec) {
164 m_interval->promise.set_value();
166 return m_interval->promise.get_future();
169 void AwaitPrimAsync::Abort() noexcept {
170 LOG4CPLUS_INFO(m_params.common.logger,
171 fmt::format(
"AwaitPrimAsync::Abort: Requested to abort! "
172 "Number of files received so far: {}",
179 m_promise.set_value({m_error, std::move(m_parts)});
182 boost::future<bool> AwaitPrimAsync::AwaitOnceAsync() {
183 using Reply = std::shared_ptr<recif::RecWaitStatus>;
184 using Seconds = std::chrono::duration<float>;
185 using std::chrono::duration_cast;
187 LOG4CPLUS_DEBUG(m_params.common.logger,
188 fmt::format(
"AwaitPrimAsync::AwaitOnceAsync: Operation aborted"));
189 return boost::make_ready_future<bool>(
true);
191 LOG4CPLUS_DEBUG(m_params.common.logger,
192 fmt::format(
"AwaitPrimAsync::AwaitOnceAsync: Sending requests..."));
194 m_last_start = std::chrono::steady_clock::now();
196 return SendRequestAndCollectReplies<bool>(
197 m_params.common.prim_sources.begin(),
198 m_params.common.prim_sources.end(),
201 return IsSubsequentState(State::Stopped, source.GetState());
206 auto spec = std::make_shared<RecWaitSpec>(
207 duration_cast<Seconds>(this->m_params.wait_interval).count());
208 return s.GetSource().GetRrClient().RecWait(spec);
212 ->
bool { return HandleRecWaitReply(source, std::move(fut)); },
213 std::string_view(
"AwaitPrimAsync: await primary data acquisition"))
214 .then(m_params.common.executor, AwaitUnwrapReplies);
217 bool AwaitPrimAsync::HandleRecWaitReply(
218 Source<PrimSource>& source, boost::future<std::shared_ptr<recif::RecWaitStatus>>&& fut) {
224 auto reply = fut.get();
226 m_params.common.status.ClearAlert(alert_id);
228 auto status = reply->getStatus();
229 if (status == recif::Success) {
231 source.SetState(State::Stopped);
233 auto rec_status = reply->getRecStatus();
234 LOG4CPLUS_INFO(m_params.common.logger,
235 fmt::format(
"Data source '{}' replied successfully and provides {} "
238 rec_status->getDpFiles().size()));
239 if (rec_status->getDpFiles().empty()) {
240 LOG4CPLUS_WARN(m_params.common.logger,
241 fmt::format(
"Data source '{}' replied successfully for "
242 "RecWait but did not produce any files!",
243 source.GetSource()));
245 for (
auto const& file : rec_status->getDpFiles()) {
246 m_parts.emplace_back(std::string(source.GetSource().GetName()), file);
254 }
catch (recif::ExceptionErr
const& e) {
255 m_params.common.status.SetAlert(
257 fmt::format(
"Primary source '{}' request 'RecWaitStatus' replied "
258 "with ICD error: ({}) {}",
259 source.GetSource().GetName(),
263 throw boost::enable_current_exception(
264 DaqSourceError(
"RecWait", std::string(source.GetSource().GetName()), e.what()));
267 m_params.common.status.SetAlert(
269 fmt::format(
"Primary source '{}' request 'RecWaitStatus' replied "
270 "with non-ICD error: {}",
271 source.GetSource().GetName(),
274 throw boost::enable_current_exception(DaqSourceError(
275 "RecWait", std::string(source.GetSource().GetName()),
"unknown exception"));