ddt  0.1
ddtDataConsumer.hpp
Go to the documentation of this file.
1 // @copyright
2 // (c) Copyright ESO 2020
3 // All Rights Reserved
4 // ESO (eso.org) is an Intergovernmental Organization, and therefore special
5 // legal conditions apply.
6 //
7 // @file ddtDataConsumer.hpp
8 // @brief Data Consumer.
9 //
10 // This class provides the functionality to subscribe to a data stream, to
11 // write the received data into shared memory and to notify DdtDataSubscribers
12 // that new data is available. There will be one data consumer object per
13 // data stream identifier and thus several DdtDataSubscribers may share one
14 // consumer object.
15 //
16 // @author Matthias Grimm, CGI
17 // @since 2020/01/16
18 //
19 
20 #ifndef DDTDATACONSUMER_HPP_
21 #define DDTDATACONSUMER_HPP_
22 
24 #include "ddt/ddtStatistics.hpp"
25 
26 namespace mal = ::elt::mal;
27 namespace datatransfer = ::elt::ddt::datatransfer;
28 
32 typedef boost::signals2::signal<void(datatransfer::NotificationType,
33  const std::string&)>
35 
39 typedef signal_n::slot_type slot_n;
40 
41 namespace ddt {
42 
51  public:
55  DdtDataConsumer(const std::string& data_stream_identifier,
56  const int32_t latency, const int32_t deadline,
57  DdtLogger* ddt_logger);
58 
62  DdtDataConsumer(const std::string& data_stream_identifier,
63  const std::string& subscription_uri, const int32_t latency,
64  const int32_t deadline, DdtLogger* ddt_logger);
65 
69  ~DdtDataConsumer() override;
70 
74  void StartSubscription();
75 
79  void StopSubscription();
80 
92  REMOTE
93  };
94 
98  void AddUuid(std::string uuid, SubscriberType type);
99 
103  void RemoveUuid(const std::string uuid);
104 
108  void Notify(const NotificationType type) override;
109 
113  int32_t get_number_of_subscribers();
114 
120 
125  std::map<std::string, SubscriberType> get_subscribers();
126 
130  std::string get_remote_broker_uri() const;
131 
135  int32_t get_notification_port() const;
136 
141 
145  void ResetStatistics();
146 
150  std::string get_publishing_uri() const;
151 
155  void set_remote_broker_uri(const std::string& remote_uri);
156 
160  void set_number_of_samples(const int32_t num_samples);
161 
165  void set_memory_accessor(DdtMemoryAccessor* mem_accessor);
166 
170  void set_notification_port(const int32_t noti_port);
171 
175  void set_publishing_uri(const std::string pub_uri);
176 
181 
182  protected:
186  void Init(const std::string& ds_id, DdtLogger* ddt_logger);
187 
192  std::string publishing_uri;
193 
200  std::chrono::system_clock::time_point
202  uint64_t total_samples =
203  0;
204  uint64_t total_bytes =
205  0;
206  uint64_t total_latency = 0;
208  private:
212  void CreateSubscriber(const std::string& subscription_uri,
213  const int32_t latency, const int32_t deadline);
214 
218  void Subscribe();
219 
223  void CreateNotifier(const int32_t latency, const int32_t deadline);
224 
229  void ReceiveDataEvent(
230  const mal::ps::DataEvent<datatransfer::DataPacket>& event);
231 
232  std::unique_ptr<
233  mal::ps::Subscriber<datatransfer::DataPacket>,
234  std::default_delete<mal::ps::Subscriber<datatransfer::DataPacket> > >
235  data_subscriber;
236  mal::ps::DataEventFilter<datatransfer::DataPacket> filter;
237  std::shared_ptr<datatransfer::DataPacket> ddt_key_sample;
238  std::shared_ptr<datatransfer::DataPacket> ddt_data_packet;
239 
240  std::string remote_broker_uri;
241  std::mutex subscriber_mutex;
242  std::mutex statistics_mutex;
243 
248  std::map<std::string, SubscriberType> subscriber_map;
249 
250  std::promise<void> exit_signal;
251  std::future<void> future_object;
252 
257  std::unique_ptr<mal::ps::InstancePublisher<datatransfer::NotificationSample>,
258  std::default_delete<mal::ps::InstancePublisher<
259  datatransfer::NotificationSample> > >
260  notifier;
261  std::shared_ptr<datatransfer::NotificationSample> ddt_notification_sample;
262 };
263 
264 } // namespace ddt
265 
266 #endif /* DDTDATACONSUMER_HPP_ */
ddt::DdtStatistics
Definition: ddtStatistics.hpp:24
ddt::DdtDataConsumer::get_remote_broker_uri
std::string get_remote_broker_uri() const
Definition: ddtDataConsumer.cpp:266
ddt::DdtDataConsumer::~DdtDataConsumer
~DdtDataConsumer() override
ddt::DdtDataConsumer::RemoveUuid
void RemoveUuid(const std::string uuid)
Definition: ddtDataConsumer.cpp:225
ddt::DdtDataConsumer::get_number_of_subscribers
int32_t get_number_of_subscribers()
Definition: ddtDataConsumer.cpp:236
ddt::DdtDataConsumer
Definition: ddtDataConsumer.hpp:50
ddt::DdtDataConsumer::set_remote_broker_uri
void set_remote_broker_uri(const std::string &remote_uri)
Definition: ddtDataConsumer.cpp:299
ddt::DdtDataConsumer::LOCAL
@ LOCAL
Definition: ddtDataConsumer.hpp:88
slot_n
signal_n::slot_type slot_n
Definition: ddtDataConsumer.hpp:39
ddt::DdtDataConsumer::DdtDataConsumer
DdtDataConsumer(const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline, DdtLogger *ddt_logger)
Definition: ddtDataConsumer.cpp:24
ddt::DdtDataConsumer::last_received
std::chrono::system_clock::time_point last_received
Definition: ddtDataConsumer.hpp:201
ddt::DdtLogger
Definition: ddtLogger.hpp:48
ddt::DdtDataConsumer::total_samples
uint64_t total_samples
Definition: ddtDataConsumer.hpp:202
ddt::DdtMemoryAccessor
Definition: ddtMemoryAccessor.hpp:257
ddt::DdtDataConsumer::get_publishing_uri
std::string get_publishing_uri() const
Definition: ddtDataConsumer.cpp:295
ddt
Definition: ddtClient.hpp:36
ddt::DdtDataConsumer::set_memory_accessor
void set_memory_accessor(DdtMemoryAccessor *mem_accessor)
Definition: ddtDataConsumer.cpp:307
ddt::DdtDataConsumer::SubscriberType
SubscriberType
Definition: ddtDataConsumer.hpp:84
ddt::DdtDataConsumer::Init
void Init(const std::string &ds_id, DdtLogger *ddt_logger)
Definition: ddtDataConsumer.cpp:44
ddt::DdtDataConsumer::REMOTE
@ REMOTE
Definition: ddtDataConsumer.hpp:92
ddt::DdtDataConsumer::StartSubscription
void StartSubscription()
Definition: ddtDataConsumer.cpp:319
ddt::DdtDataConsumer::set_notification_port
void set_notification_port(const int32_t noti_port)
Definition: ddtDataConsumer.cpp:311
ddt::DdtDataConsumer::data_stream_identifier
std::string data_stream_identifier
Definition: ddtDataConsumer.hpp:189
ddt::DdtDataConsumer::set_publishing_uri
void set_publishing_uri(const std::string pub_uri)
Definition: ddtDataConsumer.cpp:315
ddtProducerConsumerBase.hpp
ddt::DdtDataConsumer::memory_accessor
DdtMemoryAccessor * memory_accessor
Definition: ddtDataConsumer.hpp:188
ddt::DdtProducerConsumerBase::NotificationType
NotificationType
Definition: ddtProducerConsumerBase.hpp:59
ddt::DdtDataConsumer::ResetStatistics
void ResetStatistics()
Definition: ddtDataConsumer.cpp:288
ddt::DdtDataConsumer::publishing_uri
std::string publishing_uri
Definition: ddtDataConsumer.hpp:192
ddt::DdtDataConsumer::notification_signal
signal_n notification_signal
Definition: ddtDataConsumer.hpp:180
ddt::DdtDataConsumer::StopSubscription
void StopSubscription()
Definition: ddtDataConsumer.cpp:328
ddt::DdtDataConsumer::notification_port
int32_t notification_port
Definition: ddtDataConsumer.hpp:191
ddt::DdtDataConsumer::AddUuid
void AddUuid(std::string uuid, SubscriberType type)
Definition: ddtDataConsumer.cpp:219
ddt::DdtDataConsumer::set_number_of_samples
void set_number_of_samples(const int32_t num_samples)
Definition: ddtDataConsumer.cpp:303
ddt::DdtDataConsumer::get_notification_port
int32_t get_notification_port() const
Definition: ddtDataConsumer.cpp:270
signal_n
boost::signals2::signal< void(datatransfer::NotificationType, const std::string &)> signal_n
Definition: ddtDataConsumer.hpp:34
ddt::DdtProducerConsumerBase
Definition: ddtProducerConsumerBase.hpp:40
ddt::DdtDataConsumer::get_statistics
DdtStatistics get_statistics()
Definition: ddtDataConsumer.cpp:274
ddt::DdtDataConsumer::get_subscribers
std::map< std::string, SubscriberType > get_subscribers()
Definition: ddtDataConsumer.cpp:261
ddt::DdtDataConsumer::total_bytes
uint64_t total_bytes
Definition: ddtDataConsumer.hpp:204
ddtStatistics.hpp
ddt::DdtDataConsumer::get_number_of_remote_subscribers
int32_t get_number_of_remote_subscribers()
Definition: ddtDataConsumer.cpp:241
ddt::DdtDataConsumer::total_latency
uint64_t total_latency
Definition: ddtDataConsumer.hpp:206
ddt::DdtDataConsumer::Notify
void Notify(const NotificationType type) override
Definition: ddtDataConsumer.cpp:91
ddt::DdtDataConsumer::number_of_samples
int32_t number_of_samples
Definition: ddtDataConsumer.hpp:190