rad  2.0.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
subscriber.hpp
Go to the documentation of this file.
1 
9 #ifndef RAD_MAL_SUBSCRIBER_HPP
10 #define RAD_MAL_SUBSCRIBER_HPP
11 
12 #include <rad/assert.hpp>
13 #include <rad/logger.hpp>
14 
15 #include <mal/Cii.hpp>
16 #include <mal/Mal.hpp>
17 #include <mal/utility/LoadMal.hpp>
18 
19 #include <functional>
20 
21 namespace rad {
22 namespace cii {
23 
31 template <typename TOPIC_TYPE>
32 class Subscriber {
33  public:
34  using TopicHandler_t = std::function<void(elt::mal::ps::Subscriber<TOPIC_TYPE>&,
35  const elt::mal::ps::DataEvent<TOPIC_TYPE>&)>;
36 
45  Subscriber(const elt::mal::Uri& uri, TopicHandler_t handler,
46  const std::optional<elt::mal::Mal::Properties> mal_properties = {})
47  : m_subscriber(nullptr), m_subscription(nullptr) {
49 
50  m_subscriber = elt::mal::CiiFactory::getInstance().getSubscriber<TOPIC_TYPE>(
51  elt::mal::Uri(uri), elt::mal::ps::qos::QoS::DEFAULT,
52  mal_properties ? *mal_properties : elt::mal::Mal::Properties());
53  RAD_ASSERTPTR(m_subscriber);
54 
55  m_subscription =
56  m_subscriber->subscribeAsync(elt::mal::ps::DataEventFilter<TOPIC_TYPE>::all(), handler);
57  RAD_ASSERTPTR(m_subscription);
58 
59  LOG4CPLUS_DEBUG(GetLogger(), "Subscriber registered async.");
60  }
61 
62  Subscriber(const Subscriber&) = delete;
63  Subscriber& operator=(const Subscriber&) = delete;
64 
65  // Enable moving
66  Subscriber(Subscriber&& rhs) = default;
67 
68  private:
69  std::unique_ptr<elt::mal::ps::Subscriber<TOPIC_TYPE>>
70  m_subscriber; // Pointer to MAL Subscriber.
71  std::unique_ptr<::elt::mal::ps::Subscription>
72  m_subscription; // Pointer to the topic subscription.
73 };
74 
84 template <typename TOPIC_TYPE, typename EVENT_TYPE>
85 class SMSubscriber : public rad::cii::Subscriber<TOPIC_TYPE> {
86  public:
96  SMSubscriber(const elt::mal::Uri& uri, SMAdapter& sm,
97  const std::optional<elt::mal::Mal::Properties> mal_properties = {})
99  std::bind(&SMSubscriber::Callback, this,
100  std::placeholders::_1, std::placeholders::_2),
101  mal_properties),
102  m_sm(sm) {
103  RAD_TRACE(GetLogger());
104  }
105 
106  void Callback(elt::mal::ps::Subscriber<TOPIC_TYPE>& subscriber,
107  const elt::mal::ps::DataEvent<TOPIC_TYPE>& event) {
108  RAD_TRACE(GetLogger());
109 
110  if (event.hasValidData()) {
111  LOG4CPLUS_DEBUG(GetLogger(), "Received topic, posting event to SM via ASIO.");
112  m_sm.PostEvent(rad::UniqueEvent(new EVENT_TYPE(event.getData())));
113  } else {
114  LOG4CPLUS_WARN(GetLogger(), "Received invalid data!");
115  }
116  }
117 
118  SMSubscriber(const SMSubscriber&) = delete;
119  SMSubscriber& operator=(const SMSubscriber&) = delete;
120 
121  // Enable moving ??
122  // SMSubscriber(SMSubscriber&& rhs) = default;
123 
124  private:
125  rad::SMAdapter& m_sm;
126 };
127 
128 #if 0
129 template<typename TOPIC_TYPE, typename EVENT_TYPE>
130 class SubscriberSM {
131 public:
132  explicit SubscriberSM(rad::SMAdapter& sm) : m_sm(sm) {
133  RAD_TRACE(GetLogger());
134  }
135 
136  ~SubscriberSM() {
137  RAD_TRACE(GetLogger());
138  }
139 
140  void Start(const std::string& uriStr,
141  const std::string& malType) {
142  RAD_TRACE(GetLogger());
143 
144  elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
145 
146  elt::mal::Mal::Properties mal_properties;
147  auto mal_instance = elt::mal::loadMal(malType, mal_properties);
148 
149  auto uri = elt::mal::Uri(uriStr);
150  std::string scheme = uri.scheme().to_string();
151  factory.registerMal(scheme, mal_instance);
152 
153  mSubscriber = factory.getSubscriber<TOPIC_TYPE>(uri, elt::mal::ps::qos::QoS::DEFAULT, mal_properties);
154  if (m_subscriber == nullptr) {
155  throw std::runtime_error("Cannot create Subscriber");
156  }
157 
158 /*
159  auto topicInstance = m_subscriber->createDataEntity();
160  if (topicInstance == nullptr) {
161  throw std::runtime_error("Subscriber cannot create data entity");
162  }
163 
164  auto dataEventFilter = elt::mal::ps::DataEventFilter<TOPIC_TYPE>::all();
165 */
166 
167  m_subscription = m_subscriber->subscribeAsync(elt::mal::ps::DataEventFilter<TOPIC_TYPE>::all(),
168  [this](elt::mal::ps::Subscriber<TOPIC_TYPE>& Subscriber,
169  const elt::mal::ps::DataEvent<TOPIC_TYPE>& event) {
170  this->Callback(Subscriber, event);
171  });
172  if (m_subscription == nullptr) {
173  throw std::runtime_error("Subscriber Async failed");
174  }
175 
176  LOG4CPLUS_DEBUG(GetLogger(), "Subscriber registered async.");
177  }
178 
179  void Stop() {
180  RAD_TRACE(GetLogger());
181  // @todo what happens to the subscription?
182  m_subscriber->close();
183  }
184 
185  void Callback(elt::mal::ps::Subscriber<TOPIC_TYPE>& Subscriber,
186  const elt::mal::ps::DataEvent<TOPIC_TYPE>& event) {
187  RAD_TRACE(GetLogger());
188 
189  if (event.hasValidData()) {
190  //auto sample = event.getData();
191  //auto altPos = sample->getAlt();
192  //auto azPos = sample->getAz();
193  //std::cout << "Counter " << ++mNumSamples << " Alt = " << altPos << " Az = " << azPos << std::endl;
194  LOG4CPLUS_DEBUG(GetLogger(), "Received topic, posting event to SM ...");
195  //rad::UniqueEvent(new MalEvents::DevMeas())
196  m_sm.PostEvent(rad::UniqueEvent(new EVENT_TYPE(event.getData())));
197  } else {
198  LOG4CPLUS_WARN(GetLogger(), "Received invalid data!");
199  }
200 
201  }
202 
203  SubscriberSM(const SubscriberSM&) = delete;
204  SubscriberSM& operator= (const SubscriberSM&) = delete;
205 
206  // Enable moving
207  //SubscriberSM(SubscriberSM&& rhs) = default;
208 
209 private:
210  rad::SMAdapter& m_sm;
211  UniqueEvent m_topic_event;
212  std::shared_ptr<elt::mal::ps::SubscriberSM<TOPIC_TYPE>> m_subscriber;
213  std::unique_ptr<::elt::mal::ps::Subscription> m_subscription;
214 };
215 #endif
216 
217 } // namespace cii
218 } // namespace rad
219 
220 #endif // RAD_MAL_SUBSCRIBER_HPP
log4cplus::Logger & GetLogger()
Definition: logger.cpp:43
std::function< void(elt::mal::ps::Subscriber< TOPIC_TYPE > &, const elt::mal::ps::DataEvent< TOPIC_TYPE > &)> TopicHandler_t
Definition: subscriber.hpp:35
#define RAD_ASSERTPTR(a)
Definition: assert.hpp:19
void PostEvent(SharedEvent e)
Definition: smAdapter.cpp:181
Definition: subscriber.hpp:85
void Callback(elt::mal::ps::Subscriber< TOPIC_TYPE > &subscriber, const elt::mal::ps::DataEvent< TOPIC_TYPE > &event)
Definition: subscriber.hpp:106
SMSubscriber(const elt::mal::Uri &uri, SMAdapter &sm, const std::optional< elt::mal::Mal::Properties > mal_properties={})
Definition: subscriber.hpp:96
Definition: smAdapter.hpp:42
std::unique_ptr< AnyEvent > UniqueEvent
Definition: anyEvent.hpp:45
Definition: subscriber.hpp:32
Subscriber & operator=(const Subscriber &)=delete
def handler
Definition: test_dispatcher.py:11
#define RAD_TRACE(logger)
Definition: logger.hpp:19
SMSubscriber & operator=(const SMSubscriber &)=delete
Subscriber(const elt::mal::Uri &uri, TopicHandler_t handler, const std::optional< elt::mal::Mal::Properties > mal_properties={})
Definition: subscriber.hpp:45