ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
asyncOpParams.hpp
Go to the documentation of this file.
1 #ifndef DAQ_OP_ASYNC_OP_PARAMS_HPP_
2 #define DAQ_OP_ASYNC_OP_PARAMS_HPP_
3 
4 #include <boost/asio/io_context.hpp>
5 #include <log4cplus/logger.h>
6 
7 #include <rad/ioExecutor.hpp>
8 
9 #include "../status.hpp"
10 #include "../eventLog.hpp"
11 #include "../pendingReplies.hpp"
12 #include "../source.hpp"
13 #include "../state.hpp"
14 #include "../utility.hpp"
15 
16 namespace daq::op {
17 
18 struct AlertState {
19  std::vector<Alert> set;
20  std::vector<Alert> cleared;
21 
22  void Set(Alert alert) {
23  SetAlert(set, std::move(alert));
24  }
25  void Clear(AlertId id) {
26  auto alert = MakeAlert(std::move(id), "");
27  auto it = std::find(cleared.begin(), cleared.end(), alert);
28  if (it != cleared.end()) {
29  // Replace
30  *it = std::move(alert);
31  } else {
32  // Or add
33  cleared.emplace_back(std::move(alert));
34  }
35  }
36 };
37 
38 /**
39  * Merge alerts.
40  *
41  * It only set/clears if timestamp is newer.
42  */
43 inline void MergeAlerts(ObservableStatus& dest, AlertState& src) {
44  auto const& active = dest.GetAlerts();
45  // Order should not be important
46  for (auto& a : src.set) {
47  auto it = std::find(active.begin(), active.end(), a);
48  // Only set alert if new or newer
49  if (it == active.end() || a.timestamp > it->timestamp) {
50  dest.SetAlert(std::move(a));
51  }
52  }
53  for (auto& a : src.cleared) {
54  auto it = std::find(active.begin(), active.end(), a);
55  // Only clear alert if timestamp of `a` is newer
56  if (it != active.end() && a.timestamp > it->timestamp) {
57  dest.ClearAlert(a.id);
58  }
59  }
60 }
61 
62 /**
63  * Parameters required for each async operation.
64  *
65  * Caller is responsible for keeping objects alive for the duration of the operation.
66  */
67 struct AsyncOpParams {
69  ObservableEventLog& event_log_arg,
71  rad::IoExecutor& executor_arg,
72  log4cplus::Logger& logger_arg,
73  std::string const& id_arg,
74  PendingReplies& pending_replies_arg,
75  std::vector<Source<PrimSource>>& prim_sources_arg,
76  std::vector<Source<MetaSource>>& meta_sources_arg)
77  : status(status_arg)
78  , event_log(event_log_arg)
79  , alerts(alerts)
80  , executor(executor_arg)
81  , logger(logger_arg)
82  , id(id_arg)
83  , pending_replies(pending_replies_arg)
84  , prim_sources(prim_sources_arg)
85  , meta_sources(meta_sources_arg) {
86  }
87  AsyncOpParams(AsyncOpParams const&) = default;
89 
90  /**
91  * Async operations should not modify state directly
92  * DaqController does that. The only allowed operation is to set/clear alerts.
93  */
96  /**
97  * Alerts to be merged only after completion of async operation.
98  */
101  log4cplus::Logger& logger;
102  std::string const& id;
104  std::vector<Source<PrimSource>>& prim_sources; ///< Note: Consider vector immutable!
105  std::vector<Source<MetaSource>>& meta_sources; ///< Note: Consider vector immutable!a
106 };
107 
108 /**
109  * Await specific parameters that is not provided with AsyncOpParams.
110  *
111  * @important The clients provided must be configured with a reply timeout that takes the wait
112  * timeout into account, otherwise MAL will time out before reply is sent.
113  */
115  using Duration = std::chrono::milliseconds;
116  AwaitOpParams(AsyncOpParams common_arg, AwaitOpParams::Duration wait_interval_arg) noexcept
117  : common(common_arg), wait_interval(wait_interval_arg) {
118  }
119  AwaitOpParams(AwaitOpParams const&) = default;
121 
123 
124  /**
125  * Total amount of time to wait for condition to be fulfilled.
126  */
128 };
129 
130 } // namespace daq::op
131 
132 #endif // #ifndef DAQ_OP_ASYNC_OP_PARAMS_HPP_
Stores data acquisition status and allows subscription to status changes.
Definition: eventLog.hpp:107
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:165
void ClearAlert(AlertId const &alert)
Clear alert.
Definition: status.cpp:254
void SetAlert(Alert alert)
Set alert.
Definition: status.cpp:249
std::vector< Alert > const & GetAlerts() const noexcept
Definition: status.cpp:207
Simple class that allows you to keep track of how many replies are pending.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
void MergeAlerts(ObservableStatus &dest, AlertState &src)
Merge alerts.
void SetAlert(std::vector< Alert > &alerts, Alert alert)
Set alert.
Definition: status.cpp:19
Alert MakeAlert(std::string_view category, std::string key, std::string description)
Construct alert.
Definition: status.cpp:39
Describes an active Data Acquisition alert.
Definition: status.hpp:72
Uniquely identfies an alert.
Definition: status.hpp:52
Simple class that holds the source and associated state.
Definition: source.hpp:29
void Set(Alert alert)
void Clear(AlertId id)
std::vector< Alert > set
std::vector< Alert > cleared
Parameters required for each async operation.
std::string const & id
std::vector< Source< MetaSource > > & meta_sources
Note: Consider vector immutable!a.
AsyncOpParams(ObservableStatus &status_arg, ObservableEventLog &event_log_arg, AlertState &alerts, rad::IoExecutor &executor_arg, log4cplus::Logger &logger_arg, std::string const &id_arg, PendingReplies &pending_replies_arg, std::vector< Source< PrimSource >> &prim_sources_arg, std::vector< Source< MetaSource >> &meta_sources_arg)
ObservableEventLog & event_log
AsyncOpParams(AsyncOpParams &&)=default
AsyncOpParams(AsyncOpParams const &)=default
AlertState & alerts
Alerts to be merged only after completion of async operation.
std::vector< Source< PrimSource > > & prim_sources
Note: Consider vector immutable!
PendingReplies & pending_replies
rad::IoExecutor & executor
ObservableStatus & status
Async operations should not modify state directly DaqController does that.
log4cplus::Logger & logger
Await specific parameters that is not provided with AsyncOpParams.
AwaitOpParams(AwaitOpParams const &)=default
Duration wait_interval
Total amount of time to wait for condition to be fulfilled.
AwaitOpParams(AwaitOpParams &&)=default
AwaitOpParams(AsyncOpParams common_arg, AwaitOpParams::Duration wait_interval_arg) noexcept
std::chrono::milliseconds Duration