rad  2.0.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
msgRequestor.hpp
Go to the documentation of this file.
1 
9 #ifndef RAD_MSG_REQUESTOR_HPP
10 #define RAD_MSG_REQUESTOR_HPP
11 
12 #include <rad/assert.hpp>
13 #include <rad/errors.hpp>
14 #include <rad/exceptions.hpp>
15 #include <rad/logger.hpp>
16 #include <rad/msgRequestorRaw.hpp>
17 
18 #include <azmq/message.hpp>
19 #include <azmq/socket.hpp>
20 
21 #include <boost/asio.hpp>
22 
23 #include <google/protobuf/message.h>
24 
25 #include <chrono>
26 #include <memory>
27 
28 namespace rad {
29 
33 template <typename TYPEREQ, typename TYPEREP>
34 class MsgRequestor {
35  public:
36  MsgRequestor(const std::string& endpoint, const std::string& identity,
37  boost::asio::io_service& ios,
38  std::function<void(const std::error_code&, TYPEREP)> reply_handler);
39  virtual ~MsgRequestor();
40 
41  size_t Send(const TYPEREQ& payload, const long timeout = 0);
42 
43  MsgRequestor(const MsgRequestor&) = delete;
44  MsgRequestor& operator=(const MsgRequestor&) = delete;
45 
46  private:
47  void Callback(const std::error_code& err_code, const std::string& msg_type_id, const void* data,
48  const size_t data_size);
49 
50  MsgRequestorRaw m_msg_requestor_raw;
51  std::function<void(const std::error_code&, TYPEREP)> m_reply_handler;
52 };
53 
62 template <typename TYPEREQ, typename TYPEREP>
64  const std::string& endpoint, const std::string& identity, boost::asio::io_service& ios,
65  std::function<void(const std::error_code&, TYPEREP)> reply_handler)
66  : m_msg_requestor_raw(
67  endpoint, identity, ios,
68  std::bind(&MsgRequestor::Callback, this, std::placeholders::_1, std::placeholders::_2,
69  std::placeholders::_3, std::placeholders::_4)),
70  m_reply_handler(reply_handler) {
72 }
73 
77 template <typename TYPEREQ, typename TYPEREP>
80 }
81 
89 template <typename TYPEREQ, typename TYPEREP>
90 size_t MsgRequestor<TYPEREQ, TYPEREP>::Send(const TYPEREQ& payload, const long timeout) {
92 
93  /*
94  * Important: never call google::protobuf::ShutdownProtobufLibrary()
95  * before accessing the message descriptor!
96  */
97  RAD_ASSERTPTR(payload.GetDescriptor());
98  std::string payload_type = payload.GetDescriptor()->full_name();
99 
100  std::string str;
101  if (payload.SerializeToString(&str) == false) {
102  LOG4CPLUS_ERROR(GetLogger(),
103  "Failed serializing to string payload type <" << payload_type << ">");
104  return 0;
105  }
106  return m_msg_requestor_raw.Send(payload_type, str, timeout);
107 }
108 
117 template <typename TYPEREQ, typename TYPEREP>
118 void MsgRequestor<TYPEREQ, TYPEREP>::Callback(const std::error_code& err_code,
119  const std::string& msg_type_id, const void* data,
120  const size_t data_size) {
121  RAD_TRACE(GetLogger());
122 
123  TYPEREP reply;
124 
125  if (err_code) {
126  m_reply_handler(err_code, reply);
127  return;
128  }
129 
130  if (reply.ParseFromArray(data, data_size)) {
131  m_reply_handler({}, reply);
132  } else {
133  LOG4CPLUS_ERROR(GetLogger(), "Failed to parse reply type <" << msg_type_id << ">");
134  m_reply_handler(rad::ErrorCodes::DESERIALIZATION_ERR, reply);
135  // @todo throw exception?
136  }
137 }
138 
145 template <typename TREQ, typename TREP>
147  using request_t = TREQ;
148  using reply_t = TREP;
149  using handler_t = std::function<void(const std::error_code&, reply_t)>;
150 
160  MsgRequestor2(const std::string& endpoint, const std::string& identity,
161  boost::asio::io_service& ios)
162  : m_raw_requestor(endpoint, identity, ios) {}
163  virtual ~MsgRequestor2(){};
164 
165  MsgRequestor2(const MsgRequestor2&) = delete;
166  MsgRequestor2& operator=(const MsgRequestor2&) = delete;
167  MsgRequestor2(MsgRequestor2&&) = default;
168  MsgRequestor2& operator=(MsgRequestor2&&) = default;
169 
180  const request_t& payload, handler_t handler,
181  std::chrono::milliseconds const timeout = std::chrono::milliseconds(0)) {
182  RAD_TRACE(GetLogger());
183  RAD_ASSERTPTR(payload.GetDescriptor());
184  std::string payload_type = payload.GetDescriptor()->full_name();
185 
186  std::string str;
187  if (payload.SerializeToString(&str) == false) {
188  LOG4CPLUS_ERROR(GetLogger(),
189  "Failed serializing to string payload type <" << payload_type << ">");
190  return 0;
191  }
192  return m_raw_requestor.AsyncSendReceive(
193  payload_type, str,
194  [handler](std::error_code const& ec, std::string const& msg_type_id, const void* p_data,
195  const size_t size) {
196  // RAD_TRACE(GetLogger());
197  reply_t reply;
198  if (ec) {
199  handler(ec, reply);
200  return;
201  }
202 
203  // Assert p_data != nullptr
204  if (reply.ParseFromArray(p_data, size)) {
205  handler({}, reply);
206  } else {
207  // LOG4CPLUS_ERROR(GetLogger(), "Failed to parse reply type <" << msg_type_id <<
208  // ">");
210  }
211  },
212  timeout);
213  }
214 
215  private:
216  MsgRequestorRaw2 m_raw_requestor;
217 };
218 
219 } // namespace rad
220 
221 #endif // RAD_MSG_REQUESTOR_HPP
MsgRequestor & operator=(const MsgRequestor &)=delete
log4cplus::Logger & GetLogger()
Definition: logger.cpp:43
Definition: msgRequestor.hpp:146
size_t Send(const TYPEREQ &payload, const long timeout=0)
Definition: msgRequestor.hpp:90
std::function< void(const std::error_code &, reply_t)> handler_t
Definition: msgRequestor.hpp:149
MsgRequestor2 & operator=(const MsgRequestor2 &)=delete
#define RAD_ASSERTPTR(a)
Definition: assert.hpp:19
TREP reply_t
Definition: msgRequestor.hpp:148
TREQ request_t
Definition: msgRequestor.hpp:147
MsgRequestor(const std::string &endpoint, const std::string &identity, boost::asio::io_service &ios, std::function< void(const std::error_code &, TYPEREP)> reply_handler)
Definition: msgRequestor.hpp:63
Definition: msgRequestorRaw.hpp:30
virtual ~MsgRequestor2()
Definition: msgRequestor.hpp:163
MsgRequestor2(const std::string &endpoint, const std::string &identity, boost::asio::io_service &ios)
Definition: msgRequestor.hpp:160
size_t AsyncSendReceive(const request_t &payload, handler_t handler, std::chrono::milliseconds const timeout=std::chrono::milliseconds(0))
Definition: msgRequestor.hpp:179
size_t AsyncSendReceive(std::string const &payload_type, std::string const &payload, handler_t handler, std::chrono::milliseconds const timeout=std::chrono::milliseconds(0))
Definition: msgRequestorRaw.cpp:285
Definition: msgRequestorRaw.hpp:71
def handler
Definition: test_dispatcher.py:11
#define RAD_TRACE(logger)
Definition: logger.hpp:19
Definition: msgRequestor.hpp:34
virtual ~MsgRequestor()
Definition: msgRequestor.hpp:78