17 #ifndef DDTCONNECTIONMANAGER_HPP_
18 #define DDTCONNECTIONMANAGER_HPP_
20 #include <boost/bind.hpp>
21 #include <boost/property_tree/ini_parser.hpp>
22 #include <boost/property_tree/ptree.hpp>
23 #include <boost/signals2/signal.hpp>
24 #include <boost/bind/bind.hpp>
28 #include <mal/rr/ServerAmi.hpp>
29 #include <mal/rr/ServerContextProvider.hpp>
30 #include <mal/rr/qos/ReplyTime.hpp>
39 namespace mal = ::elt::mal;
40 namespace datatransfer = ::elt::ddt::datatransfer;
49 :
public virtual datatransfer::DataBrokerRegistration {
55 const std::string uri_string,
56 const std::string config);
63 const std::string uri_string,
64 const std::string config);
78 const int32_t latency, int32_t deadline,
79 const int32_t max_data_sample_size,
80 const int32_t number_of_samples,
81 const bool compute_checksum,
82 const std::string& publishing_uri)
override;
89 const std::string& data_stream_identifier)
override;
104 void PublishData(
const std::string& data_stream_identifier)
override;
119 const std::string& data_stream_identifier,
120 const std::string& remote_broker_uri,
121 const int32_t latency,
122 const int32_t deadline)
override;
130 const std::string& subscriber_uuid,
131 const std::string& data_stream_identifier,
132 const int32_t latency,
133 const int32_t deadline)
override;
144 const std::string& subscriber_uuid)
override;
155 const std::string& data_stream_identifier)
override;
161 const std::string& data_stream_identifier)
override;
167 const std::string& data_stream_identifier)
override;
173 const std::string& data_stream_identifier)
override;
179 const std::string& data_stream_identifier)
override;
199 std::string
get_shm_id(
const std::string& data_stream_identifier)
override;
213 const std::string& data_stream_identifier)
override;
224 const std::string& remote_broker_uri,
225 const std::string& data_stream_identifier)
override;
234 const int32_t datavec_size,
235 const uint64_t source_timestamp)
override;
278 const std::string remote_broker_uri)
const;
296 const int32_t number_of_samples);
319 std::map<std::string, DdtStatistics>
327 const std::string uri_string,
const std::string config);
332 void PrintConfigValues();
337 void StartHeartbeat();
342 void StopHeartbeat();
349 void HeartbeatThread();
354 void NotifySubscribers(
const std::string& data_stream_identifier,
355 std::unique_lock<std::mutex>& producer_lock);
360 int32_t CheckPublisherUsingStream(
const std::string data_stream_identifier,
361 const std::string remote_broker_uri);
366 void CreateDataConsumer(std::unique_lock<std::mutex>& consumer_lock,
367 const std::string subscriber_uuid,
368 const std::string data_stream_identifier,
369 const std::string remote_broker_uri,
370 const int32_t latency,
const int32_t deadline,
371 const std::string subscription_uri,
372 const int32_t number_of_samples);
379 void SearchAndUnregSubscriber(
const std::string identifier);
384 int32_t RegisterLocalSubscriber(
const std::string subscriber_uuid,
385 const std::string data_stream_identifier,
386 const int32_t latency,
387 const int32_t deadline);
392 int32_t RegisterLocalSubscriberRemote(
393 const std::string subscriber_uuid,
394 const std::string data_stream_identifier,
395 const std::string remote_broker_uri,
const int32_t latency,
396 const int32_t deadline);
402 void ProcessNotificationEvent(
const datatransfer::NotificationType type,
403 const std::string& data_stream_identifier);
414 void FreeShmThread(
const std::string& data_stream_identifier,
423 bool CheckPublisherReregistration(
const std::string& data_stream_identifier);
434 bool CheckSharedMemoryRecreation(
const std::string& data_stream_identifier,
435 const int32_t max_data_sample_size,
436 const int32_t number_of_samples);
442 void Publish(
const std::string& data_stream_identifier);
448 void PubRegNotification(
const std::string& data_stream_identifier);
454 void PubUnregNotification(
const std::string& data_stream_identifier);
456 std::map<std::string, DdtDataProducer*>
458 std::map<std::string, DdtDataConsumer*>
460 std::map<std::string,
461 std::chrono::time_point<std::chrono::high_resolution_clock>>
463 std::map<std::string, std::string>
464 connected_brokers_map;
468 std::map<std::string, DdtClient*>
471 std::atomic<bool> stop_threads;
472 std::atomic<int> thread_counter;
474 std::mutex producer_mutex;
475 std::mutex consumer_mutex;
476 std::mutex client_mutex;
477 std::mutex statistics_mutex;
478 std::mutex ddt_clients_mutex;
479 std::mutex registered_publishers_mutex;
480 std::mutex connected_brokers_mutex;
487 std::promise<void> exit_signal;
488 std::future<void> future_object;
489 std::atomic<bool> heartbeat_active;
490 std::string broker_uri;
491 std::string config_file;
493 boost::signals2::connection connection;
495 const int32_t SHM_TIMEOUT_MIN = 2;
496 const int32_t WAITING_TIME_MIN = 1000;
497 const int32_t REPLY_TIME_MIN = 2;
498 const int32_t HEARTBEAT_INTERVAL_MIN = 0;
499 const int32_t HEARTBEAT_TIMEOUT_MIN = 3;
500 const int32_t NUM_RETRIES = 30;