ddt  0.1
ddtConnectionManager.hpp
Go to the documentation of this file.
1 // @copyright
2 // (c) Copyright ESO 2020
3 // All Rights Reserved
4 // ESO (eso.org) is an Intergovernmental Organization, and therefore special
5 // legal conditions apply.
6 //
7 // @file ddtConnectionManager.hpp
8 // @brief Connection Manager.
9 //
10 // This class manages the connection handling between Data Brokers and
11 // Publisher / Subscriber applications.
12 //
13 // @author Matthias Grimm, CGI
14 // @since 2020/01/16
15 //
16 
17 #ifndef DDTCONNECTIONMANAGER_HPP_
18 #define DDTCONNECTIONMANAGER_HPP_
19 
20 #include <boost/bind.hpp>
21 #include <boost/property_tree/ini_parser.hpp>
22 #include <boost/property_tree/ptree.hpp>
23 #include <boost/signals2/signal.hpp>
24 #include <boost/bind/bind.hpp>
25 #include <chrono>
26 #include <memory>
27 
28 #include <mal/rr/ServerAmi.hpp>
29 #include <mal/rr/ServerContextProvider.hpp>
30 #include <mal/rr/qos/ReplyTime.hpp>
31 
32 #include "ddt/ddtLogger.hpp"
33 
34 #include "ddt/ddtClient.hpp"
35 #include "ddt/ddtDataConsumer.hpp"
36 #include "ddt/ddtDataProducer.hpp"
37 #include "ddt/ddtMemoryManager.hpp"
38 
39 namespace mal = ::elt::mal;
40 namespace datatransfer = ::elt::ddt::datatransfer;
41 
42 namespace ddt {
43 
49  : public virtual datatransfer::DataBrokerRegistration {
50  public:
54  explicit DdtConnectionManager(DdtLogger* ddt_logger,
55  const std::string uri_string,
56  const std::string config);
57 
61  explicit DdtConnectionManager(DdtMemoryManager* const mmgr,
62  DdtLogger* ddt_logger,
63  const std::string uri_string,
64  const std::string config);
65 
69  ~DdtConnectionManager() override;
70 
77  int32_t RegisterPublisher(const std::string& data_stream_identifier,
78  const int32_t latency, int32_t deadline,
79  const int32_t max_data_sample_size,
80  const int32_t number_of_samples,
81  const bool compute_checksum,
82  const std::string& publishing_uri) override;
83 
88  int32_t UnregisterPublisher(
89  const std::string& data_stream_identifier) override;
90 
94  int32_t UnregisterPublishers();
95 
104  void PublishData(const std::string& data_stream_identifier) override;
105 
118  int32_t RegisterSubscriber(const std::string& subscriber_uuid,
119  const std::string& data_stream_identifier,
120  const std::string& remote_broker_uri,
121  const int32_t latency,
122  const int32_t deadline) override;
123 
129  int32_t RegisterRemoteSubscriber(const std::string& remote_broker,
130  const std::string& subscriber_uuid,
131  const std::string& data_stream_identifier,
132  const int32_t latency,
133  const int32_t deadline) override;
134 
143  int32_t UnregisterSubscriber(const std::string& data_stream_identifier,
144  const std::string& subscriber_uuid) override;
145 
149  int32_t UnregisterSubscribers();
150 
154  int32_t get_max_data_sample_size(
155  const std::string& data_stream_identifier) override;
156 
160  int32_t get_number_of_samples(
161  const std::string& data_stream_identifier) override;
162 
166  std::string get_publishing_uri(
167  const std::string& data_stream_identifier) override;
168 
172  int32_t get_notification_port(
173  const std::string& data_stream_identifier) override;
174 
178  std::vector<std::string> get_statistics(
179  const std::string& data_stream_identifier) override;
180 
184  int32_t get_heartbeat_interval() override;
185 
189  int32_t get_heartbeat_timeout() override;
190 
194  bool get_compute_checksum(const std::string& data_stream_identifier) override;
195 
199  std::string get_shm_id(const std::string& data_stream_identifier) override;
200 
204  void UpdateHeartbeat(const std::string& identifier) override;
205 
213  const std::string& data_stream_identifier) override;
214 
218  bool CheckPublisherExists(const std::string& data_stream_identifier) override;
219 
224  const std::string& remote_broker_uri,
225  const std::string& data_stream_identifier) override;
226 
233  void UpdateStatistics(const std::string& data_stream_identifier,
234  const int32_t datavec_size,
235  const uint64_t source_timestamp) override;
236 
242  int32_t GetMaxPossibleBufferSize(int32_t max_data_sample_size) override;
243 
248  std::vector<std::string> GetRegisteredStreams() override;
249 
254  std::vector<std::string> GetConnectedBrokers() override;
255 
256  protected:
260  void LoadDefaults();
261 
267  std::string GetConfigPath() const;
268 
272  void ReadIni();
273 
277  std::string CreateSubscriptionUri(const std::string publishing_uri,
278  const std::string remote_broker_uri) const;
279 
287  bool CheckStreamIdInUse(const std::string& data_stream_identifier);
288 
295  void CreateStatistics(const std::string& data_stream_identifier,
296  const int32_t number_of_samples);
297 
302  void ResetStatistics(const std::string& data_stream_identifier);
303 
304  const int32_t SHM_TIMEOUT_DEFAULT = 10;
305  const int32_t WAITING_TIME_DEFAULT = 1000;
306  const int32_t REPLY_TIME_DEFAULT = 6;
307  const int32_t HEARTBEAT_INTERVAL_DEFAULT = 1;
308  const int32_t HEARTBEAT_TIMEOUT_DEFAULT = 10;
309 
310  int32_t shm_timeout; // configurable timeout after which a shared memory is
311  // deleted in [s]
312  int32_t waiting_time; // configurable waiting time for MAL publishers to
313  // establish communication in [ms]
314  int32_t reply_time; // configurable reply time for MAL clients in [s]
315  int32_t heartbeat_interval; // configurable interval for the heartbeat in [s]
316  int32_t heartbeat_timeout; // configurable timeout for the heartbeat in [s]
317 
318  std::set<std::string> registered_publishers;
319  std::map<std::string, DdtStatistics>
322  private:
326  void Init(DdtMemoryManager* mmgr, DdtLogger* ddt_logger,
327  const std::string uri_string, const std::string config);
328 
332  void PrintConfigValues();
333 
337  void StartHeartbeat();
338 
342  void StopHeartbeat();
343 
349  void HeartbeatThread();
350 
354  void NotifySubscribers(const std::string& data_stream_identifier,
355  std::unique_lock<std::mutex>& producer_lock);
356 
360  int32_t CheckPublisherUsingStream(const std::string data_stream_identifier,
361  const std::string remote_broker_uri);
362 
366  void CreateDataConsumer(std::unique_lock<std::mutex>& consumer_lock,
367  const std::string subscriber_uuid,
368  const std::string data_stream_identifier,
369  const std::string remote_broker_uri,
370  const int32_t latency, const int32_t deadline,
371  const std::string subscription_uri,
372  const int32_t number_of_samples);
373 
379  void SearchAndUnregSubscriber(const std::string identifier);
380 
384  int32_t RegisterLocalSubscriber(const std::string subscriber_uuid,
385  const std::string data_stream_identifier,
386  const int32_t latency,
387  const int32_t deadline);
388 
392  int32_t RegisterLocalSubscriberRemote(
393  const std::string subscriber_uuid,
394  const std::string data_stream_identifier,
395  const std::string remote_broker_uri, const int32_t latency,
396  const int32_t deadline);
397 
402  void ProcessNotificationEvent(const datatransfer::NotificationType type,
403  const std::string& data_stream_identifier);
404 
414  void FreeShmThread(const std::string& data_stream_identifier,
415  const int shm_timeout);
416 
423  bool CheckPublisherReregistration(const std::string& data_stream_identifier);
424 
434  bool CheckSharedMemoryRecreation(const std::string& data_stream_identifier,
435  const int32_t max_data_sample_size,
436  const int32_t number_of_samples);
437 
442  void Publish(const std::string& data_stream_identifier);
443 
448  void PubRegNotification(const std::string& data_stream_identifier);
449 
454  void PubUnregNotification(const std::string& data_stream_identifier);
455 
456  std::map<std::string, DdtDataProducer*>
457  producer_map;
458  std::map<std::string, DdtDataConsumer*>
459  consumer_map;
460  std::map<std::string,
461  std::chrono::time_point<std::chrono::high_resolution_clock>>
462  client_map;
463  std::map<std::string, std::string>
464  connected_brokers_map;
466  DdtMemoryManager* memory_manager;
467 
468  std::map<std::string, DdtClient*>
469  ddt_clients;
471  std::atomic<bool> stop_threads;
472  std::atomic<int> thread_counter;
473 
474  std::mutex producer_mutex;
475  std::mutex consumer_mutex;
476  std::mutex client_mutex;
477  std::mutex statistics_mutex;
478  std::mutex ddt_clients_mutex;
479  std::mutex registered_publishers_mutex;
480  std::mutex connected_brokers_mutex;
481 
485  DdtLogger* logger;
486 
487  std::promise<void> exit_signal;
488  std::future<void> future_object;
489  std::atomic<bool> heartbeat_active;
490  std::string broker_uri;
491  std::string config_file;
492 
493  boost::signals2::connection connection;
494 
495  const int32_t SHM_TIMEOUT_MIN = 2;
496  const int32_t WAITING_TIME_MIN = 1000;
497  const int32_t REPLY_TIME_MIN = 2;
498  const int32_t HEARTBEAT_INTERVAL_MIN = 0;
499  const int32_t HEARTBEAT_TIMEOUT_MIN = 3;
500  const int32_t NUM_RETRIES = 30;
501 };
502 
503 } // namespace ddt
504 
505 #endif /* DDTCONNECTIONMANAGER_HPP_ */
ddt::DdtConnectionManager::UpdateStatistics
void UpdateStatistics(const std::string &data_stream_identifier, const int32_t datavec_size, const uint64_t source_timestamp) override
Definition: ddtConnectionManager.cpp:1533
ddt::DdtConnectionManager::GetMaxPossibleBufferSize
int32_t GetMaxPossibleBufferSize(int32_t max_data_sample_size) override
Definition: ddtConnectionManager.cpp:1595
ddt::DdtConnectionManager::get_max_data_sample_size
int32_t get_max_data_sample_size(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1360
ddt::DdtConnectionManager::UnregisterPublisher
int32_t UnregisterPublisher(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:506
ddt::DdtConnectionManager::CheckStreamIdInUse
bool CheckStreamIdInUse(const std::string &data_stream_identifier)
Definition: ddtConnectionManager.cpp:1292
ddt::DdtConnectionManager::RegisterPublisher
int32_t RegisterPublisher(const std::string &data_stream_identifier, const int32_t latency, int32_t deadline, const int32_t max_data_sample_size, const int32_t number_of_samples, const bool compute_checksum, const std::string &publishing_uri) override
Definition: ddtConnectionManager.cpp:344
ddt::DdtConnectionManager::CheckPublisherExists
bool CheckPublisherExists(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1308
ddt::DdtConnectionManager::shm_timeout
int32_t shm_timeout
Definition: ddtConnectionManager.hpp:310
ddt::DdtLogger
Definition: ddtLogger.hpp:48
ddt::DdtConnectionManager::UnregisterSubscriber
int32_t UnregisterSubscriber(const std::string &data_stream_identifier, const std::string &subscriber_uuid) override
Definition: ddtConnectionManager.cpp:1158
ddt::DdtConnectionManager::UnregisterSubscribers
int32_t UnregisterSubscribers()
Definition: ddtConnectionManager.cpp:1242
ddt
Definition: ddtClient.hpp:36
ddt::DdtConnectionManager::CheckPubRegistrationPermitted
bool CheckPubRegistrationPermitted(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1273
ddt::DdtConnectionManager::reply_time
int32_t reply_time
Definition: ddtConnectionManager.hpp:314
ddt::DdtConnectionManager::ReadIni
void ReadIni()
Definition: ddtConnectionManager.cpp:85
ddt::DdtConnectionManager::UpdateHeartbeat
void UpdateHeartbeat(const std::string &identifier) override
Definition: ddtConnectionManager.cpp:335
ddt::DdtConnectionManager::get_number_of_samples
int32_t get_number_of_samples(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1365
ddt::DdtMemoryManager
Definition: ddtMemoryManager.hpp:50
ddt::DdtConnectionManager::GetConfigPath
std::string GetConfigPath() const
Definition: ddtConnectionManager.cpp:69
ddt::DdtConnectionManager::RegisterRemoteSubscriber
int32_t RegisterRemoteSubscriber(const std::string &remote_broker, const std::string &subscriber_uuid, const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline) override
Definition: ddtConnectionManager.cpp:1104
ddt::DdtConnectionManager::get_publishing_uri
std::string get_publishing_uri(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1370
ddt::DdtConnectionManager::CreateStatistics
void CreateStatistics(const std::string &data_stream_identifier, const int32_t number_of_samples)
Definition: ddtConnectionManager.cpp:1555
ddt::DdtConnectionManager::heartbeat_timeout
int32_t heartbeat_timeout
Definition: ddtConnectionManager.hpp:316
ddt::DdtConnectionManager::get_shm_id
std::string get_shm_id(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1462
ddt::DdtConnectionManager::get_notification_port
int32_t get_notification_port(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1382
ddt::DdtConnectionManager::statistics_map
std::map< std::string, DdtStatistics > statistics_map
Definition: ddtConnectionManager.hpp:320
ddt::DdtConnectionManager::DdtConnectionManager
DdtConnectionManager(DdtLogger *ddt_logger, const std::string uri_string, const std::string config)
Definition: ddtConnectionManager.cpp:21
ddt::DdtConnectionManager
Definition: ddtConnectionManager.hpp:49
ddt::DdtConnectionManager::ResetStatistics
void ResetStatistics(const std::string &data_stream_identifier)
Definition: ddtConnectionManager.cpp:1581
ddt::DdtConnectionManager::HEARTBEAT_INTERVAL_DEFAULT
const int32_t HEARTBEAT_INTERVAL_DEFAULT
Definition: ddtConnectionManager.hpp:307
ddt::DdtConnectionManager::PublishData
void PublishData(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:634
ddt::DdtConnectionManager::CreateSubscriptionUri
std::string CreateSubscriptionUri(const std::string publishing_uri, const std::string remote_broker_uri) const
Definition: ddtConnectionManager.cpp:1467
ddt::DdtConnectionManager::HEARTBEAT_TIMEOUT_DEFAULT
const int32_t HEARTBEAT_TIMEOUT_DEFAULT
Definition: ddtConnectionManager.hpp:308
ddt::DdtConnectionManager::SHM_TIMEOUT_DEFAULT
const int32_t SHM_TIMEOUT_DEFAULT
Definition: ddtConnectionManager.hpp:304
ddt::DdtConnectionManager::UnregisterPublishers
int32_t UnregisterPublishers()
Definition: ddtConnectionManager.cpp:610
ddt::DdtConnectionManager::get_statistics
std::vector< std::string > get_statistics(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1394
ddtMemoryManager.hpp
ddtDataConsumer.hpp
ddtDataProducer.hpp
ddtLogger.hpp
ddt::DdtConnectionManager::registered_publishers
std::set< std::string > registered_publishers
Definition: ddtConnectionManager.hpp:318
ddt::DdtConnectionManager::REPLY_TIME_DEFAULT
const int32_t REPLY_TIME_DEFAULT
Definition: ddtConnectionManager.hpp:306
ddt::DdtConnectionManager::GetRegisteredStreams
std::vector< std::string > GetRegisteredStreams() override
Definition: ddtConnectionManager.cpp:1600
ddt::DdtConnectionManager::GetConnectedBrokers
std::vector< std::string > GetConnectedBrokers() override
Definition: ddtConnectionManager.cpp:1625
ddt::DdtConnectionManager::WAITING_TIME_DEFAULT
const int32_t WAITING_TIME_DEFAULT
Definition: ddtConnectionManager.hpp:305
ddtClient.hpp
ddt::DdtConnectionManager::heartbeat_interval
int32_t heartbeat_interval
Definition: ddtConnectionManager.hpp:315
ddt::DdtConnectionManager::CheckRemotePublisherExists
bool CheckRemotePublisherExists(const std::string &remote_broker_uri, const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1321
ddt::DdtConnectionManager::get_heartbeat_timeout
int32_t get_heartbeat_timeout() override
Definition: ddtConnectionManager.cpp:1451
ddt::DdtConnectionManager::get_compute_checksum
bool get_compute_checksum(const std::string &data_stream_identifier) override
Definition: ddtConnectionManager.cpp:1455
ddt::DdtConnectionManager::RegisterSubscriber
int32_t RegisterSubscriber(const std::string &subscriber_uuid, const std::string &data_stream_identifier, const std::string &remote_broker_uri, const int32_t latency, const int32_t deadline) override
Definition: ddtConnectionManager.cpp:724
ddt::DdtConnectionManager::LoadDefaults
void LoadDefaults()
Definition: ddtConnectionManager.cpp:61
ddt::DdtConnectionManager::~DdtConnectionManager
~DdtConnectionManager() override
Definition: ddtConnectionManager.cpp:39
ddt::DdtConnectionManager::waiting_time
int32_t waiting_time
Definition: ddtConnectionManager.hpp:312
ddt::DdtConnectionManager::get_heartbeat_interval
int32_t get_heartbeat_interval() override
Definition: ddtConnectionManager.cpp:1447