rad  2.0.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
requestor.hpp
Go to the documentation of this file.
1 
9 #ifndef RAD_MAL_REQUESTOR_HPP
10 #define RAD_MAL_REQUESTOR_HPP
11 
12 #include <type_traits>
13 
14 #include <rad/logger.hpp>
15 #include <rad/mal/utils.hpp>
16 #include <rad/smAdapter.hpp>
17 
18 #include <mal/Cii.hpp>
19 #include <mal/Mal.hpp>
20 #include <mal/utility/LoadMal.hpp>
21 
22 namespace rad {
23 
24 /*
25  * Forward declaration to avoid dependency with rad.sm.
26  */
27 class SMAdapter;
28 
29 namespace cii {
30 
38 template <typename INTERFACE_TYPE>
39 class Requestor {
40  public:
48  Requestor(const elt::mal::Uri& uri,
49  const std::optional<elt::mal::Mal::Properties> mal_properties = {})
50  : m_client() {
52 
53  m_client = elt::mal::CiiFactory::getInstance().getClient<INTERFACE_TYPE>(
54  uri, elt::mal::rr::qos::QoS::DEFAULT,
55  mal_properties ? *mal_properties : elt::mal::Mal::Properties());
56  LOG4CPLUS_DEBUG(GetLogger(), "Created MAL client for <" << uri << ">");
57  }
58 
65  std::shared_ptr<INTERFACE_TYPE>& GetInterface() {
67  return m_client;
68  }
69 
70  Requestor(const Requestor&) = delete;
71  Requestor& operator=(const Requestor&) = delete;
72 
73  private:
74  std::shared_ptr<INTERFACE_TYPE> m_client; // Share pointer to MAL Client
75 };
76 
88 template <class EVENT, class T>
89 void RoutePartialReply(std::shared_ptr<::elt::mal::rr::Ami<T>> ami, rad::SMAdapter& sm) {
90  ::elt::mal::future<T> fut = ami->next();
91  fut.then([=, &sm](::elt::mal::future<T> fut) {
92  try {
93  LOG4CPLUS_DEBUG(GetLogger(),
94  "Received async partial reply, triggering associated event <"
95  << typeid(EVENT).name() << ">");
96  sm.PostEvent(std::make_unique<EVENT>(fut.get()));
97  LOG4CPLUS_DEBUG(GetLogger(),
98  "Received async partial reply, triggering associated event - done!");
99  if (ami->isDone() == false) {
100  RoutePartialReply<EVENT, T>(ami, sm);
101  LOG4CPLUS_DEBUG(GetLogger(),
102  "Invoked RoutePartialReply for the next partial/final reply");
103  }
104  } catch (...) {
105  LOG4CPLUS_ERROR(GetLogger(), "Unknown exception when receiving partial reply");
106  }
107  });
108  LOG4CPLUS_DEBUG(log4cplus::Logger::getInstance(rad::LOGGER_NAME), "Installed AMI continuation");
109 }
110 
122 template <class EVENT, class FUT>
123 CancellationToken RouteReply(FUT&& rep_future, rad::SMAdapter& sm) {
124  CancellationToken token;
125  rep_future.then([&](FUT res) {
126  if (token.IsCancelled()) {
127  return;
128  }
129  try {
130  LOG4CPLUS_DEBUG(GetLogger(), "Received async reply, triggering associated event <"
131  << typeid(EVENT).name() << ">");
132  sm.PostEvent(std::make_unique<EVENT>(res));
133  LOG4CPLUS_DEBUG(GetLogger(),
134  "Received async reply, triggering associated event - done!");
135  } catch (...) {
136  LOG4CPLUS_ERROR(GetLogger(), "Unknown exception when receiving the reply");
137  }
138  });
139  LOG4CPLUS_DEBUG(GetLogger(), "Installed future continuation");
140  return token;
141 }
142 
155 template <class EVENT, class EVENT_ERR, class FUT>
156 CancellationToken RouteReply(FUT&& rep_future, rad::SMAdapter& sm) {
157  CancellationToken token;
158  rep_future.then([&, token](FUT res) {
159  if (token.IsCancelled()) {
160  return;
161  }
162  try {
163  if (res.has_value()) {
164  // NOLINTNEXTLINE
165  if constexpr (std::is_void<typename FUT::value_type>::value) {
166  LOG4CPLUS_DEBUG(GetLogger(),
167  "Received async VOID reply, triggering associated event <"
168  << typeid(EVENT).name() << ">");
169  sm.PostEvent(std::make_unique<EVENT>());
170  } else {
171  // LOG4CPLUS_DEBUG(GetLogger(), "Received async reply <" << res.get() << ">,
172  // triggering associated event <" << typeid(EVENT).name() << ">");
173  LOG4CPLUS_DEBUG(GetLogger(),
174  "Received async reply, triggering associated event <"
175  << typeid(EVENT).name() << ">");
176  sm.PostEvent(std::make_unique<EVENT>(res.get()));
177  }
178  LOG4CPLUS_DEBUG(GetLogger(),
179  "Received async reply, triggering associated event - done!");
180  } else if (res.has_exception()) {
181  LOG4CPLUS_DEBUG(GetLogger(), "Exception waiting for reply");
182  sm.PostEvent(std::make_unique<EVENT_ERR>(res.get_exception_ptr()));
183  } else {
184  RAD_ASSERTNEVER();
185  }
186  } catch (...) {
187  LOG4CPLUS_ERROR(GetLogger(), "Unknown exception when receiving the reply");
188  }
189  });
190  LOG4CPLUS_DEBUG(GetLogger(), "Installed future continuation");
191  return token;
192 }
193 
208 template <class EVENT, class EVENT_TIMEOUT, class EVENT_ERR, class FUT>
209 CancellationToken RouteReplyWithTimeout(FUT&& rep_future, rad::SMAdapter& sm) {
210  CancellationToken token;
211  rep_future.then([&, token](FUT res) {
212  if (token.IsCancelled()) {
213  return;
214  }
215  try {
216  if (res.has_value()) {
217  // NOLINTNEXTLINE
218  if constexpr (std::is_void<typename FUT::value_type>::value) {
219  LOG4CPLUS_DEBUG(GetLogger(),
220  "Received async VOID reply, triggering associated event <"
221  << typeid(EVENT).name() << ">");
222  sm.PostEvent(std::make_unique<EVENT>());
223  } else {
224  // LOG4CPLUS_DEBUG(GetLogger(), "Received async reply <" << res.get() << ">,
225  // triggering associated event <" << typeid(EVENT).name() << ">");
226  LOG4CPLUS_DEBUG(GetLogger(),
227  "Received async reply, triggering associated event <"
228  << typeid(EVENT).name() << ">");
229  sm.PostEvent(std::make_unique<EVENT>(res.get()));
230  }
231  LOG4CPLUS_DEBUG(GetLogger(),
232  "Received async reply, triggering associated event - done!");
233  } else if (res.has_exception()) {
234  LOG4CPLUS_DEBUG(GetLogger(), "Received exception as reply, rethrowing it.");
235  boost::rethrow_exception(res.get_exception_ptr());
236  } else {
237  RAD_ASSERTNEVER();
238  }
239  } catch (const elt::mal::TimeoutException& e) {
240  LOG4CPLUS_DEBUG(GetLogger(), "ExceptionTimeout while waiting for reply");
241  sm.PostEvent(std::make_unique<EVENT_TIMEOUT>());
242  } catch (...) {
243  LOG4CPLUS_DEBUG(GetLogger(), "Received exception as reply, post associated event <"
244  << typeid(EVENT).name() << ">");
245  sm.PostEvent(std::make_unique<EVENT_ERR>(res.get_exception_ptr()));
246  }
247  });
248  LOG4CPLUS_DEBUG(GetLogger(), "Installed future continuation");
249  return token;
250 }
251 
252 } // namespace cii
253 } // namespace rad
254 
255 #endif // RAD_MAL_REQUESTOR_HPP_
log4cplus::Logger & GetLogger()
Definition: logger.cpp:43
CancellationToken RouteReply(FUT &&rep_future, rad::SMAdapter &sm)
Definition: requestor.hpp:123
std::shared_ptr< INTERFACE_TYPE > & GetInterface()
Definition: requestor.hpp:65
Requestor & operator=(const Requestor &)=delete
void PostEvent(SharedEvent e)
Definition: smAdapter.cpp:181
const std::string LOGGER_NAME
Definition: logger.hpp:74
Definition: smAdapter.hpp:42
#define RAD_ASSERTNEVER()
Definition: assert.hpp:20
#define RAD_TRACE(logger)
Definition: logger.hpp:19
void RoutePartialReply(std::shared_ptr<::elt::mal::rr::Ami< T >> ami, rad::SMAdapter &sm)
Definition: requestor.hpp:89
CancellationToken RouteReplyWithTimeout(FUT &&rep_future, rad::SMAdapter &sm)
Definition: requestor.hpp:209
Requestor(const elt::mal::Uri &uri, const std::optional< elt::mal::Mal::Properties > mal_properties={})
Definition: requestor.hpp:48
Definition: requestor.hpp:39