Go to the documentation of this file.
20 #ifndef DDTDATACONSUMER_HPP_
21 #define DDTDATACONSUMER_HPP_
26 namespace mal = ::elt::mal;
27 namespace datatransfer = ::elt::ddt::datatransfer;
32 typedef boost::signals2::signal<void(datatransfer::NotificationType,
56 const int32_t latency,
const int32_t deadline,
63 const std::string& subscription_uri,
const int32_t latency,
64 const int32_t deadline,
DdtLogger* ddt_logger);
200 std::chrono::system_clock::time_point
212 void CreateSubscriber(
const std::string& subscription_uri,
213 const int32_t latency,
const int32_t deadline);
223 void CreateNotifier(
const int32_t latency,
const int32_t deadline);
229 void ReceiveDataEvent(
230 const mal::ps::DataEvent<datatransfer::DataPacket>& event);
233 mal::ps::Subscriber<datatransfer::DataPacket>,
234 std::default_delete<mal::ps::Subscriber<datatransfer::DataPacket> > >
236 mal::ps::DataEventFilter<datatransfer::DataPacket> filter;
237 std::shared_ptr<datatransfer::DataPacket> ddt_key_sample;
238 std::shared_ptr<datatransfer::DataPacket> ddt_data_packet;
240 std::string remote_broker_uri;
241 std::mutex subscriber_mutex;
242 std::mutex statistics_mutex;
248 std::map<std::string, SubscriberType> subscriber_map;
250 std::promise<void> exit_signal;
251 std::future<void> future_object;
257 std::unique_ptr<mal::ps::InstancePublisher<datatransfer::NotificationSample>,
258 std::default_delete<mal::ps::InstancePublisher<
259 datatransfer::NotificationSample> > >
261 std::shared_ptr<datatransfer::NotificationSample> ddt_notification_sample;
Definition: ddtStatistics.hpp:24
std::string get_remote_broker_uri() const
Definition: ddtDataConsumer.cpp:266
~DdtDataConsumer() override
void RemoveUuid(const std::string uuid)
Definition: ddtDataConsumer.cpp:225
int32_t get_number_of_subscribers()
Definition: ddtDataConsumer.cpp:236
Definition: ddtDataConsumer.hpp:50
void set_remote_broker_uri(const std::string &remote_uri)
Definition: ddtDataConsumer.cpp:299
@ LOCAL
Definition: ddtDataConsumer.hpp:88
signal_n::slot_type slot_n
Definition: ddtDataConsumer.hpp:39
DdtDataConsumer(const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline, DdtLogger *ddt_logger)
Definition: ddtDataConsumer.cpp:24
std::chrono::system_clock::time_point last_received
Definition: ddtDataConsumer.hpp:201
Definition: ddtLogger.hpp:48
uint64_t total_samples
Definition: ddtDataConsumer.hpp:202
Definition: ddtMemoryAccessor.hpp:257
std::string get_publishing_uri() const
Definition: ddtDataConsumer.cpp:295
Definition: ddtClient.hpp:36
void set_memory_accessor(DdtMemoryAccessor *mem_accessor)
Definition: ddtDataConsumer.cpp:307
SubscriberType
Definition: ddtDataConsumer.hpp:84
void Init(const std::string &ds_id, DdtLogger *ddt_logger)
Definition: ddtDataConsumer.cpp:44
@ REMOTE
Definition: ddtDataConsumer.hpp:92
void StartSubscription()
Definition: ddtDataConsumer.cpp:319
void set_notification_port(const int32_t noti_port)
Definition: ddtDataConsumer.cpp:311
std::string data_stream_identifier
Definition: ddtDataConsumer.hpp:189
void set_publishing_uri(const std::string pub_uri)
Definition: ddtDataConsumer.cpp:315
DdtMemoryAccessor * memory_accessor
Definition: ddtDataConsumer.hpp:188
NotificationType
Definition: ddtProducerConsumerBase.hpp:59
void ResetStatistics()
Definition: ddtDataConsumer.cpp:288
std::string publishing_uri
Definition: ddtDataConsumer.hpp:192
signal_n notification_signal
Definition: ddtDataConsumer.hpp:180
void StopSubscription()
Definition: ddtDataConsumer.cpp:328
int32_t notification_port
Definition: ddtDataConsumer.hpp:191
void AddUuid(std::string uuid, SubscriberType type)
Definition: ddtDataConsumer.cpp:219
void set_number_of_samples(const int32_t num_samples)
Definition: ddtDataConsumer.cpp:303
int32_t get_notification_port() const
Definition: ddtDataConsumer.cpp:270
boost::signals2::signal< void(datatransfer::NotificationType, const std::string &)> signal_n
Definition: ddtDataConsumer.hpp:34
Definition: ddtProducerConsumerBase.hpp:40
DdtStatistics get_statistics()
Definition: ddtDataConsumer.cpp:274
std::map< std::string, SubscriberType > get_subscribers()
Definition: ddtDataConsumer.cpp:261
uint64_t total_bytes
Definition: ddtDataConsumer.hpp:204
int32_t get_number_of_remote_subscribers()
Definition: ddtDataConsumer.cpp:241
uint64_t total_latency
Definition: ddtDataConsumer.hpp:206
void Notify(const NotificationType type) override
Definition: ddtDataConsumer.cpp:91
int32_t number_of_samples
Definition: ddtDataConsumer.hpp:190