ddt  0.1
ddtClient.hpp
Go to the documentation of this file.
1 // @copyright
2 // (c) Copyright ESO 2020
3 // All Rights Reserved
4 // ESO (eso.org) is an Intergovernmental Organization, and therefore special
5 // legal conditions apply.
6 //
7 // @file ddtClient.hpp
8 // @brief Client class for the connection to remote brokers.
9 //
10 // This class creates MAL clients to connect to remote brokers. It provides a
11 // connection listener which observes the connection state and reregisters
12 // subscribers in the case a broker was restarted. It also provides the
13 // heartbeat functionality.
14 //
15 // @author Matthias Grimm, CGI
16 // @since 2020/11/18
17 //
18 
19 #ifndef DDTCLIENT_HPP_
20 #define DDTCLIENT_HPP_
21 
22 #include <Ddtdatatransfericd.hpp>
23 #include <future>
24 #include <mal/Cii.hpp>
25 #include <mal/rr/qos/ReplyTime.hpp>
26 #include <mal/utility/LoadMal.hpp>
27 #include <map>
28 #include <thread>
29 
30 #include "ddt/ddtConstants.hpp"
31 #include "ddt/ddtLogger.hpp"
32 
33 namespace mal = ::elt::mal;
34 namespace datatransfer = ::elt::ddt::datatransfer;
35 
36 namespace ddt {
37 
44 class DdtClient {
45  public:
49  explicit DdtClient();
50 
54  DdtClient(const std::string remote_broker, const int32_t repl_time,
55  const int32_t hb_interval, const int32_t hb_timeout,
56  const std::string broker, DdtLogger* ddt_logger);
57 
61  virtual ~DdtClient();
62 
66  void AddUuid(std::string uuid, std::string dsi);
67 
72  void UnregisterSubscriber(const std::string uuid);
73 
78  bool CheckIfEmpty();
79 
84  bool CheckPublisherExists(const std::string& data_stream_identifier) const;
85 
90  int32_t RegisterRemoteSubscriber(const std::string& subscriber_uuid,
91  const std::string& data_stream_identifier,
92  const int32_t latency,
93  const int32_t deadline) const;
94 
99  std::string GetPublishingUri(const std::string& data_stream_identifier) const;
100 
105  int32_t get_max_data_sample_size(
106  const std::string& data_stream_identifier) const;
107 
112  int32_t get_number_of_samples(
113  const std::string& data_stream_identifier) const;
114 
118  bool get_compute_checksum(const std::string& data_stream_identifier) const;
119 
120  protected:
124  void Init(const std::string remote_broker, const int32_t repl_time,
125  const int32_t hb_interval, const int32_t hb_timeout,
126  const std::string broker, DdtLogger* ddt_logger);
127 
132  std::map<std::string, std::string>
138  std::atomic<bool> connected_to_broker;
139 
140  std::atomic<bool> heartbeat_active;
141  std::string remote_broker_uri;
142  std::string broker_uri;
143  int32_t reply_time;
147 
148  private:
152  int32_t InitMalClient();
153 
157  void StartHeartbeat();
158 
162  void StopHeartbeat();
163 
167  void HeartbeatThread();
168 
173  void Reregister();
174 
178  std::unique_ptr<
179  datatransfer::DataBrokerRegistrationSync,
180  std::default_delete<datatransfer::DataBrokerRegistrationSync> >
181  client;
182 
186  elt::mal::rr::ListenerRegistration connection_listener;
187 
188  std::promise<void> exit_signal;
189  std::future<void> future_object;
190  std::mutex subscriber_mutex;
191 
192  const int32_t NUM_RETRIES = 10;
193  const int32_t LATENCY = 10000;
194  const int32_t DEADLINE = 10;
195 };
196 
197 } // namespace ddt
198 
199 #endif /* DDTCLIENT_HPP_ */
ddt::DdtClient::get_compute_checksum
bool get_compute_checksum(const std::string &data_stream_identifier) const
Definition: ddtClient.cpp:288
ddt::DdtClient::heartbeat_interval
int32_t heartbeat_interval
Definition: ddtClient.hpp:144
ddt::DdtClient::remote_broker_uri
std::string remote_broker_uri
Definition: ddtClient.hpp:141
ddt::DdtLogger
Definition: ddtLogger.hpp:48
ddtConstants.hpp
ddt
Definition: ddtClient.hpp:36
ddt::DdtClient::Init
void Init(const std::string remote_broker, const int32_t repl_time, const int32_t hb_interval, const int32_t hb_timeout, const std::string broker, DdtLogger *ddt_logger)
Definition: ddtClient.cpp:45
ddt::DdtClient::heartbeat_active
std::atomic< bool > heartbeat_active
Definition: ddtClient.hpp:140
pyDdtBroker.broker
broker
Definition: pyDdtBroker.py:21
ddt::DdtClient::heartbeat_timeout
int32_t heartbeat_timeout
Definition: ddtClient.hpp:145
ddt::DdtClient::RegisterRemoteSubscriber
int32_t RegisterRemoteSubscriber(const std::string &subscriber_uuid, const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline) const
Definition: ddtClient.cpp:264
ddt::DdtClient::~DdtClient
virtual ~DdtClient()
Definition: ddtClient.cpp:38
ddt::DdtClient::reply_time
int32_t reply_time
Definition: ddtClient.hpp:143
ddt::DdtClient::GetPublishingUri
std::string GetPublishingUri(const std::string &data_stream_identifier) const
Definition: ddtClient.cpp:273
ddt::DdtClient::get_number_of_samples
int32_t get_number_of_samples(const std::string &data_stream_identifier) const
Definition: ddtClient.cpp:283
ddt::DdtClient::CheckIfEmpty
bool CheckIfEmpty()
Definition: ddtClient.cpp:254
ddt::DdtClient::broker_uri
std::string broker_uri
Definition: ddtClient.hpp:142
ddt::DdtClient::CheckPublisherExists
bool CheckPublisherExists(const std::string &data_stream_identifier) const
Definition: ddtClient.cpp:259
ddt::DdtClient::DdtClient
DdtClient()
ddt::DdtClient::logger
DdtLogger * logger
Definition: ddtClient.hpp:146
ddt::DdtClient::UnregisterSubscriber
void UnregisterSubscriber(const std::string uuid)
Definition: ddtClient.cpp:172
ddt::DdtClient::connected_to_broker
std::atomic< bool > connected_to_broker
Definition: ddtClient.hpp:138
ddtLogger.hpp
ddt::DdtClient::subscriber_map
std::map< std::string, std::string > subscriber_map
Definition: ddtClient.hpp:133
ddt::DdtClient::get_max_data_sample_size
int32_t get_max_data_sample_size(const std::string &data_stream_identifier) const
Definition: ddtClient.cpp:278
ddt::DdtClient
Definition: ddtClient.hpp:44
ddt::DdtClient::AddUuid
void AddUuid(std::string uuid, std::string dsi)
Definition: ddtClient.cpp:117