ddt  0.1
ddtDataSubscriber.hpp
Go to the documentation of this file.
1 // @copyright
2 // (c) Copyright ESO 2020
3 // All Rights Reserved
4 // ESO (eso.org) is an Intergovernmental Organization, and therefore special
5 // legal conditions apply.
6 //
7 // @file ddtDataSubscriber.hpp
8 // @brief Data Subscriber.
9 //
10 // This class provides the functionality for subscriber applications to register
11 // / unregister at a local broker and to receive data for a specified data
12 // stream.
13 //
14 // @author Matthias Grimm, CGI
15 // @since 2020/01/16
16 //
17 
18 #ifndef DDTDATASUBSCRIBER_HPP_
19 #define DDTDATASUBSCRIBER_HPP_
20 
21 #include <boost/bind.hpp>
22 #include <boost/uuid/uuid.hpp>
23 #include <boost/uuid/uuid_generators.hpp>
24 #include <boost/uuid/uuid_io.hpp>
27 
28 namespace ddt {
29 
36  public:
41 
45  explicit DdtDataSubscriber(log4cplus::Logger const &log4cplusLogger);
46 
50  ~DdtDataSubscriber() override;
51 
52  int RegisterSubscriber(const std::string uri, const std::string dsi,
53  const std::string remote_uri,
54  const int32_t interval = 10) override;
55 
56  int UnregisterSubscriber() override;
57 
58  DataSample *ReadData() override;
59 
64 
69 
74 
79  boost::signals2::connection connect(
80  const signal_t::slot_type &event_listener);
81 
82  protected:
86  void LoadDefaults();
87 
91  void ReadIni();
92 
96  const int32_t MAX_AGE_DATA_SAMPLE_DEFAULT = 10000;
97 
98  private:
102  void Init();
103 
107  void PrintConfigValues();
108 
112  bool CheckPath();
113 
117  void InitializeNotificationSubscriber(
118  const std::string data_stream_identifier,
119  const int32_t notification_port);
120 
125  void Subscribe();
126 
131  void Reregister();
132 
139  void NotificationEvent(
140  const mal::ps::DataEvent<datatransfer::NotificationSample> &event);
141 
145  int CheckPublisher();
146 
150  int CreateAccessor();
151 
155  void ReopenShm();
156 
157  void LogSubscriberParameter();
158 
159  void PrintErrorMessage(const int result);
160 
161  DdtStatisticsClient *statistics_client;
162  DdtMemoryAccessor *memory_accessor;
163  std::string shm_id;
164  std::string data_stream_identifier;
165  std::string subscriber_uuid;
166  std::string broker_uri;
167  std::string remote_broker_uri;
168  int32_t reading_interval;
169  std::atomic<bool> event_active;
170 
171  std::unique_ptr<mal::ps::Subscriber<datatransfer::NotificationSample>,
172  std::default_delete<
173  mal::ps::Subscriber<datatransfer::NotificationSample> > >
174  notification_subscriber;
175  std::shared_ptr<datatransfer::NotificationSample> ddt_key_notification;
176  std::shared_ptr<datatransfer::NotificationSample> ddt_notification;
177  mal::ps::DataEventFilter<datatransfer::NotificationSample> filter;
178 
179  std::promise<void> exit_signal;
180  std::future<void> future_object;
181 
182  const int32_t NUM_RETRIES = 10;
183  const int32_t MAX_AGE_DATA_SAMPLE_MIN = 2000;
184 };
185 
186 } // namespace ddt
187 
188 #endif /* DDTDATASUBSCRIBER_HPP_ */
ddt::DdtStatistics
Definition: ddtStatistics.hpp:24
ddt::DdtDataSubscriber::~DdtDataSubscriber
~DdtDataSubscriber() override
Definition: ddtDataSubscriber.cpp:38
ddt::DdtDataSubscriber::DdtDataSubscriber
DdtDataSubscriber(DdtLogger *logger)
Definition: ddtDataSubscriber.cpp:22
ddt::DdtDataSubscriber::UnregisterSubscriber
int UnregisterSubscriber() override
Definition: ddtDataSubscriber.cpp:361
ddt::DdtLogger
Definition: ddtLogger.hpp:48
ddt::DdtMemoryAccessor
Definition: ddtMemoryAccessor.hpp:257
ddt
Definition: ddtClient.hpp:36
ddtStatisticsClient.hpp
ddt::DdtDataSubscriber::max_age_data_sample
int32_t max_age_data_sample
Definition: ddtDataSubscriber.hpp:93
ddt::DdtDataSubscriber::StartNotificationSubscription
void StartNotificationSubscription()
Definition: ddtDataSubscriber.cpp:480
ddt::DdtDataSubscriber::MAX_AGE_DATA_SAMPLE_DEFAULT
const int32_t MAX_AGE_DATA_SAMPLE_DEFAULT
Definition: ddtDataSubscriber.hpp:96
ddt::DdtDataSubscriber::RegisterSubscriber
int RegisterSubscriber(const std::string uri, const std::string dsi, const std::string remote_uri, const int32_t interval=10) override
Definition: ddtDataSubscriber.cpp:141
ddt::DataSample
Definition: ddtMemoryAccessor.hpp:173
ddt::DdtDataSubscriber::LoadDefaults
void LoadDefaults()
Definition: ddtDataSubscriber.cpp:53
ddt::DdtDataSubscriber::ReadData
DataSample * ReadData() override
Definition: ddtDataSubscriber.cpp:419
ddtDataTransferLib.hpp
ddt::DdtDataSubscriber::get_statistics
DdtStatistics get_statistics()
Definition: ddtDataSubscriber.cpp:399
ddt::DdtDataSubscriber::StopNotificationSubscription
void StopNotificationSubscription()
Definition: ddtDataSubscriber.cpp:490
ddt::DdtDataSubscriber::connect
boost::signals2::connection connect(const signal_t::slot_type &event_listener)
Definition: ddtDataSubscriber.cpp:356
ddt::DdtDataTransferLib::logger
DdtLogger * logger
Definition: ddtDataTransferLib.hpp:182
ddt::DdtStatisticsClient
Definition: ddtStatisticsClient.hpp:32
ddt::DdtDataTransferLib
Definition: ddtDataTransferLib.hpp:39
ddt::DdtDataSubscriber
Definition: ddtDataSubscriber.hpp:35
ddt::DdtDataSubscriber::ReadIni
void ReadIni()
Definition: ddtDataSubscriber.cpp:59