rad  5.1.0
subscriber.hpp
Go to the documentation of this file.
1 
9 #ifndef RAD_MAL_SUBSCRIBER_HPP
10 #define RAD_MAL_SUBSCRIBER_HPP
11 
12 #include <rad/assert.hpp>
13 #include <rad/logger.hpp>
14 
15 #include <mal/Cii.hpp>
16 #include <mal/Mal.hpp>
17 #include <mal/utility/LoadMal.hpp>
18 
19 #include <functional>
20 
21 namespace rad {
22 namespace cii {
23 
31 template <typename TOPIC_TYPE>
32 class Subscriber {
33  public:
34  using TopicHandler_t = std::function<void(elt::mal::ps::Subscriber<TOPIC_TYPE>&,
35  const elt::mal::ps::DataEvent<TOPIC_TYPE>&)>;
36 
45  Subscriber(const elt::mal::Uri& uri, TopicHandler_t handler,
46  const std::optional<elt::mal::Mal::Properties> mal_properties = {})
47  : m_subscriber(nullptr), m_subscription(nullptr) {
49 
50  m_subscriber = elt::mal::CiiFactory::getInstance().getSubscriber<TOPIC_TYPE>(
51  elt::mal::Uri(uri), elt::mal::ps::qos::QoS::DEFAULT,
52  mal_properties ? *mal_properties : elt::mal::Mal::Properties());
53  RAD_ASSERTPTR(m_subscriber);
54 
55  m_subscription =
56  m_subscriber->subscribeAsync(elt::mal::ps::DataEventFilter<TOPIC_TYPE>::all(), handler);
57  RAD_ASSERTPTR(m_subscription);
58 
59  LOG4CPLUS_DEBUG(GetLogger(), "Subscriber registered async.");
60  }
61 
62  Subscriber(const Subscriber&) = delete;
63  Subscriber& operator=(const Subscriber&) = delete;
64 
65  // Enable moving
66  Subscriber(Subscriber&& rhs) = default;
67 
68  private:
69  std::unique_ptr<elt::mal::ps::Subscriber<TOPIC_TYPE>>
70  m_subscriber; // Pointer to MAL Subscriber.
71  std::unique_ptr<::elt::mal::ps::Subscription>
72  m_subscription; // Pointer to the topic subscription.
73 };
74 
75 
76 #if 0
77 template<typename TOPIC_TYPE, typename EVENT_TYPE>
78 class SubscriberSM {
79 public:
80  explicit SubscriberSM(rad::SMAdapter& sm) : m_sm(sm) {
82  }
83 
84  ~SubscriberSM() {
86  }
87 
88  void Start(const std::string& uriStr,
89  const std::string& malType) {
91 
92  elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
93 
94  elt::mal::Mal::Properties mal_properties;
95  auto mal_instance = elt::mal::loadMal(malType, mal_properties);
96 
97  auto uri = elt::mal::Uri(uriStr);
98  std::string scheme = uri.scheme().to_string();
99  factory.registerMal(scheme, mal_instance);
100 
101  mSubscriber = factory.getSubscriber<TOPIC_TYPE>(uri, elt::mal::ps::qos::QoS::DEFAULT, mal_properties);
102  if (m_subscriber == nullptr) {
103  throw std::runtime_error("Cannot create Subscriber");
104  }
105 
106 /*
107  auto topicInstance = m_subscriber->createDataEntity();
108  if (topicInstance == nullptr) {
109  throw std::runtime_error("Subscriber cannot create data entity");
110  }
111 
112  auto dataEventFilter = elt::mal::ps::DataEventFilter<TOPIC_TYPE>::all();
113 */
114 
115  m_subscription = m_subscriber->subscribeAsync(elt::mal::ps::DataEventFilter<TOPIC_TYPE>::all(),
116  [this](elt::mal::ps::Subscriber<TOPIC_TYPE>& Subscriber,
117  const elt::mal::ps::DataEvent<TOPIC_TYPE>& event) {
118  this->Callback(Subscriber, event);
119  });
120  if (m_subscription == nullptr) {
121  throw std::runtime_error("Subscriber Async failed");
122  }
123 
124  LOG4CPLUS_DEBUG(GetLogger(), "Subscriber registered async.");
125  }
126 
127  void Stop() {
128  RAD_TRACE(GetLogger());
129  // @todo what happens to the subscription?
130  m_subscriber->close();
131  }
132 
133  void Callback(elt::mal::ps::Subscriber<TOPIC_TYPE>& Subscriber,
134  const elt::mal::ps::DataEvent<TOPIC_TYPE>& event) {
135  RAD_TRACE(GetLogger());
136 
137  if (event.hasValidData()) {
138  //auto sample = event.getData();
139  //auto altPos = sample->getAlt();
140  //auto azPos = sample->getAz();
141  //std::cout << "Counter " << ++mNumSamples << " Alt = " << altPos << " Az = " << azPos << std::endl;
142  LOG4CPLUS_DEBUG(GetLogger(), "Received topic, posting event to SM ...");
143  //rad::UniqueEvent(new MalEvents::DevMeas())
144  m_sm.PostEvent(rad::UniqueEvent(new EVENT_TYPE(event.getData())));
145  } else {
146  LOG4CPLUS_WARN(GetLogger(), "Received invalid data!");
147  }
148 
149  }
150 
151  SubscriberSM(const SubscriberSM&) = delete;
152  SubscriberSM& operator= (const SubscriberSM&) = delete;
153 
154  // Enable moving
155  //SubscriberSM(SubscriberSM&& rhs) = default;
156 
157 private:
158  rad::SMAdapter& m_sm;
159  UniqueEvent m_topic_event;
160  std::shared_ptr<elt::mal::ps::SubscriberSM<TOPIC_TYPE>> m_subscriber;
161  std::unique_ptr<::elt::mal::ps::Subscription> m_subscription;
162 };
163 #endif
164 
165 } // namespace cii
166 } // namespace rad
167 
168 #endif // RAD_MAL_SUBSCRIBER_HPP
Assert header file.
#define RAD_ASSERTPTR(a)
Definition: assert.hpp:19
Definition: smAdapter.hpp:60
Definition: subscriber.hpp:32
Subscriber & operator=(const Subscriber &)=delete
std::function< void(elt::mal::ps::Subscriber< TOPIC_TYPE > &, const elt::mal::ps::DataEvent< TOPIC_TYPE > &)> TopicHandler_t
Definition: subscriber.hpp:35
Subscriber(const Subscriber &)=delete
Subscriber(const elt::mal::Uri &uri, TopicHandler_t handler, const std::optional< elt::mal::Mal::Properties > mal_properties={})
Definition: subscriber.hpp:45
Subscriber(Subscriber &&rhs)=default
Logger class.
#define RAD_TRACE(logger)
Definition: logger.hpp:24
Definition: actionsApp.cpp:20
log4cplus::Logger & GetLogger()
Definition: logger.cpp:70
std::unique_ptr< AnyEvent > UniqueEvent
Definition: anyEvent.hpp:45
def handler(event)
Definition: test_dispatcher.py:11