ddt  0.1
Public Member Functions | Protected Member Functions | Protected Attributes | List of all members
ddt::DdtConnectionManager Class Reference

#include <ddtConnectionManager.hpp>

Inheritance diagram for ddt::DdtConnectionManager:
DdtConnectionManagerFake

Public Member Functions

 DdtConnectionManager (DdtLogger *ddt_logger, const std::string uri_string, const std::string config)
 
 DdtConnectionManager (DdtMemoryManager *const mmgr, DdtLogger *ddt_logger, const std::string uri_string, const std::string config)
 
 ~DdtConnectionManager () override
 
int32_t RegisterPublisher (const std::string &data_stream_identifier, const int32_t latency, int32_t deadline, const int32_t max_data_sample_size, const int32_t number_of_samples, const bool compute_checksum, const std::string &publishing_uri) override
 
int32_t UnregisterPublisher (const std::string &data_stream_identifier) override
 
int32_t UnregisterPublishers ()
 
void PublishData (const std::string &data_stream_identifier) override
 
int32_t RegisterSubscriber (const std::string &subscriber_uuid, const std::string &data_stream_identifier, const std::string &remote_broker_uri, const int32_t latency, const int32_t deadline) override
 
int32_t RegisterRemoteSubscriber (const std::string &remote_broker, const std::string &subscriber_uuid, const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline) override
 
int32_t UnregisterSubscriber (const std::string &data_stream_identifier, const std::string &subscriber_uuid) override
 
int32_t UnregisterSubscribers ()
 
int32_t get_max_data_sample_size (const std::string &data_stream_identifier) override
 
int32_t get_number_of_samples (const std::string &data_stream_identifier) override
 
std::string get_publishing_uri (const std::string &data_stream_identifier) override
 
int32_t get_notification_port (const std::string &data_stream_identifier) override
 
std::vector< std::string > get_statistics (const std::string &data_stream_identifier) override
 
int32_t get_heartbeat_interval () override
 
int32_t get_heartbeat_timeout () override
 
bool get_compute_checksum (const std::string &data_stream_identifier) override
 
std::string get_shm_id (const std::string &data_stream_identifier) override
 
void UpdateHeartbeat (const std::string &identifier) override
 
bool CheckPubRegistrationPermitted (const std::string &data_stream_identifier) override
 
bool CheckPublisherExists (const std::string &data_stream_identifier) override
 
bool CheckRemotePublisherExists (const std::string &remote_broker_uri, const std::string &data_stream_identifier) override
 
void UpdateStatistics (const std::string &data_stream_identifier, const int32_t datavec_size, const uint64_t source_timestamp) override
 
int32_t GetMaxPossibleBufferSize (int32_t max_data_sample_size) override
 
std::vector< std::string > GetRegisteredStreams () override
 
std::vector< std::string > GetConnectedBrokers () override
 

Protected Member Functions

void LoadDefaults ()
 
std::string GetConfigPath () const
 
void ReadIni ()
 
std::string CreateSubscriptionUri (const std::string publishing_uri, const std::string remote_broker_uri) const
 
bool CheckStreamIdInUse (const std::string &data_stream_identifier)
 
void CreateStatistics (const std::string &data_stream_identifier, const int32_t number_of_samples)
 
void ResetStatistics (const std::string &data_stream_identifier)
 

Protected Attributes

const int32_t SHM_TIMEOUT_DEFAULT = 10
 
const int32_t WAITING_TIME_DEFAULT = 1000
 
const int32_t REPLY_TIME_DEFAULT = 6
 
const int32_t HEARTBEAT_INTERVAL_DEFAULT = 1
 
const int32_t HEARTBEAT_TIMEOUT_DEFAULT = 10
 
int32_t shm_timeout
 
int32_t waiting_time
 
int32_t reply_time
 
int32_t heartbeat_interval
 
int32_t heartbeat_timeout
 
std::set< std::string > registered_publishers
 
std::map< std::string, DdtStatisticsstatistics_map
 

Detailed Description

This class manages the connection handling between Data Brokers and Publisher / Subscriber applications.

Constructor & Destructor Documentation

◆ DdtConnectionManager() [1/2]

DdtConnectionManager::DdtConnectionManager ( DdtLogger ddt_logger,
const std::string  uri_string,
const std::string  config 
)
explicit

Constructor

◆ DdtConnectionManager() [2/2]

DdtConnectionManager::DdtConnectionManager ( DdtMemoryManager *const  mmgr,
DdtLogger ddt_logger,
const std::string  uri_string,
const std::string  config 
)
explicit

Constructor

◆ ~DdtConnectionManager()

DdtConnectionManager::~DdtConnectionManager ( )
override

Destructor

Member Function Documentation

◆ CheckPublisherExists()

bool DdtConnectionManager::CheckPublisherExists ( const std::string &  data_stream_identifier)
override

Checks if a publisher with the specified data stream identifier exists.

◆ CheckPubRegistrationPermitted()

bool DdtConnectionManager::CheckPubRegistrationPermitted ( const std::string &  data_stream_identifier)
override

Checks if a publisher is permitted for registration. A publisher is rejected if its stream identifier is already in use, except heartbeats are missing. Then a publisher is allowed to reregister.

◆ CheckRemotePublisherExists()

bool DdtConnectionManager::CheckRemotePublisherExists ( const std::string &  remote_broker_uri,
const std::string &  data_stream_identifier 
)
override

Check if remote publisher for specified stream id exists.

◆ CheckStreamIdInUse()

bool DdtConnectionManager::CheckStreamIdInUse ( const std::string &  data_stream_identifier)
protected

This function checks if the stream identifier is already in use. This will prevent a publisher using the same stream identifier as a subscriber that has subscribed to a remote publisher.

Parameters
data_stream_identifierThe data stream identifier.
Returns
True if stream identifier is in use, false if not.

◆ CreateStatistics()

void DdtConnectionManager::CreateStatistics ( const std::string &  data_stream_identifier,
const int32_t  number_of_samples 
)
protected

Inserts a DdtStatistics object into a map. This is done for each data stream.

Parameters
data_stream_identifierThe data stream identifier.
number_of_samplesThe queue capacity.

Create a map for the statistics. The statistics are bound to the lifetime of the shared memory. In case a publisher is restarted within the shm timeout while subscribers are still alive, the statistics are not reset.

◆ CreateSubscriptionUri()

std::string DdtConnectionManager::CreateSubscriptionUri ( const std::string  publishing_uri,
const std::string  remote_broker_uri 
) const
protected

Creates the subscription uri.

Creates the subscription URI. Therefore, it extracts the port and the path element from the publishing uri and takes the ip address from the remote broker uri. Example: publishing_uri: zpb.ps://222.22.222.222:5100/ds1 remote_broker_uri: zpb.rr://111.11.111.111:5001/broker/Broker1 resulting subscription_uri: zpb.ps://111.11.111.111:5100/ds1

◆ get_compute_checksum()

bool DdtConnectionManager::get_compute_checksum ( const std::string &  data_stream_identifier)
override

Returns compute_checksum from the memory accessor.

◆ get_heartbeat_interval()

int32_t DdtConnectionManager::get_heartbeat_interval ( )
override

Returns heartbeat_interval.

◆ get_heartbeat_timeout()

int32_t DdtConnectionManager::get_heartbeat_timeout ( )
override

Returns heartbeat_timeout.

◆ get_max_data_sample_size()

int32_t DdtConnectionManager::get_max_data_sample_size ( const std::string &  data_stream_identifier)
override

Returns max_data_sample_size.

◆ get_notification_port()

int32_t DdtConnectionManager::get_notification_port ( const std::string &  data_stream_identifier)
override

Returns notification_port.

◆ get_number_of_samples()

int32_t DdtConnectionManager::get_number_of_samples ( const std::string &  data_stream_identifier)
override

Returns number_of_samples.

◆ get_publishing_uri()

std::string DdtConnectionManager::get_publishing_uri ( const std::string &  data_stream_identifier)
override

Returns publishing_uri.

◆ get_shm_id()

std::string DdtConnectionManager::get_shm_id ( const std::string &  data_stream_identifier)
override

Returns the shared memory identifier.

◆ get_statistics()

std::vector< std::string > DdtConnectionManager::get_statistics ( const std::string &  data_stream_identifier)
override

Returns the statistics.

◆ GetConfigPath()

std::string DdtConnectionManager::GetConfigPath ( ) const
protected

This function reads the environment variable DDT_TRANSFERCONFIG_PATH and returns the path of the configuration file.

Returns
A string containing the path to the configuration file.

◆ GetConnectedBrokers()

std::vector< std::string > DdtConnectionManager::GetConnectedBrokers ( )
override

Returns the URIs of all connected remote brokers.

Returns
A vector containing the URIs of all connected remote brokers.

◆ GetMaxPossibleBufferSize()

int32_t DdtConnectionManager::GetMaxPossibleBufferSize ( int32_t  max_data_sample_size)
override

Requests the max possible buffer size (computed by the memory manager).

Parameters
max_data_sample_sizeThe maximum data sample size.
Returns
The max possible buffer size.

◆ GetRegisteredStreams()

std::vector< std::string > DdtConnectionManager::GetRegisteredStreams ( )
override

Returns all registered data streams

Returns
A vector containing all data stream identifiers.

◆ LoadDefaults()

void DdtConnectionManager::LoadDefaults ( )
protected

Loads default values for configuration parameters.

◆ PublishData()

void DdtConnectionManager::PublishData ( const std::string &  data_stream_identifier)
override

This function

  1. notifies the memory reader that new data was written into shared memory by a publisher
  2. fetches the latest data packet using the memory accessor
  3. publishes the data packet over network using the DdtDataProducer object
  4. notifies all local subscribers that new data is available

if there is at least one remote subscriber, read the packet from shared memory and publish it over the network

◆ ReadIni()

void DdtConnectionManager::ReadIni ( )
protected

Reads the databroker configuration file.

An exception is thrown if the config file does not exist. In that case the default values are used instead.

make sure shm_timeout is at least SHM_TIMEOUT_MIN set to default value otherwise

make sure waiting_time is at least WAITING_TIME_MIN set to default value otherwise

make sure reply_time is at least REPLY_TIME_MIN set to default value otherwise

make sure heartbeat_interval is at least HEARTBEAT_INTERVAL_MIN set to default value otherwise

make sure heartbeat_timeout is at least HEARTBEAT_TIMEOUT_MIN set to default value otherwise

make sure heartbeat_timeout is greater than reply_time set to default value otherwise

◆ RegisterPublisher()

int32_t DdtConnectionManager::RegisterPublisher ( const std::string &  data_stream_identifier,
const int32_t  latency,
int32_t  deadline,
const int32_t  max_data_sample_size,
const int32_t  number_of_samples,
const bool  compute_checksum,
const std::string &  publishing_uri 
)
override

This function is called from Publisher applications by the DdtDataPublishers. For each Publisher application a DdtDataProducer object is created which are stored in a producer map. This also triggers the registration process in the DdtMemoryManager.

Reuse the publishing URI if the publisher was restarted.

It takes some time to establish communication mechanism on startup of the publishers and subscribers (see MAL API description). Wait after publisher has been instantiated. This does NOT guarantee that the first n message will not be lost.

◆ RegisterRemoteSubscriber()

int32_t DdtConnectionManager::RegisterRemoteSubscriber ( const std::string &  remote_broker,
const std::string &  subscriber_uuid,
const std::string &  data_stream_identifier,
const int32_t  latency,
const int32_t  deadline 
)
override

This function is called from remote brokers to register a remote subscriber. This triggers the registration process in the DdtMemoryManager and creates a new DdtDataConsumer object.

◆ RegisterSubscriber()

int32_t DdtConnectionManager::RegisterSubscriber ( const std::string &  subscriber_uuid,
const std::string &  data_stream_identifier,
const std::string &  remote_broker_uri,
const int32_t  latency,
const int32_t  deadline 
)
override

Registers Subscriber applications. Distinguishes between local and remote subscribers. if local:

  • register the subscriber at the DdtMemoryManager
  • create a DdtDataConsumer object and store it in a consumer map if remote:
  • create a MAL client and ask remote broker for the required buffer size
  • register the subscriber at the DdtMemoryManager
  • register the subscriber at the remote broker
  • create a DdtDataConsumer object and store it in a consumer map
  • start MAL subscription on the DdtDataConsumer

Check if the remote broker uri was specified register as local subscriber if this is not the case

◆ ResetStatistics()

void DdtConnectionManager::ResetStatistics ( const std::string &  data_stream_identifier)
protected

Resets / deletes the statistics in case the shared memory is deleted.

Parameters
data_stream_identifierThe data stream identifier.

◆ UnregisterPublisher()

int32_t DdtConnectionManager::UnregisterPublisher ( const std::string &  data_stream_identifier)
override

This function stops the memory reader, unregisters the Publisher application from the DdtMemoryManager and deletes the DdtDataProducer.

◆ UnregisterPublishers()

int32_t DdtConnectionManager::UnregisterPublishers ( )

Unregisters all publishers.

go through the list of all publishers and unregister each of them

◆ UnregisterSubscriber()

int32_t DdtConnectionManager::UnregisterSubscriber ( const std::string &  data_stream_identifier,
const std::string &  subscriber_uuid 
)
override

Unregisters subscribers from their brokers. To unregister from remote brokers a MAL client is created. If it is the last subscriber of a data stream:

Set publishing uri to "" if there are no remote subscribers.

◆ UnregisterSubscribers()

int32_t DdtConnectionManager::UnregisterSubscribers ( )

Unregisters all subscribers.

go through the list of all subscribers and unregister each of them

◆ UpdateHeartbeat()

void DdtConnectionManager::UpdateHeartbeat ( const std::string &  identifier)
override

Updates the timestamp in the client map.

◆ UpdateStatistics()

void DdtConnectionManager::UpdateStatistics ( const std::string &  data_stream_identifier,
const int32_t  datavec_size,
const uint64_t  source_timestamp 
)
override

Updates the statistic counters

Parameters
data_stream_identifierThe data stream identifier.
datavec_sizeThe size of the image data.
source_timestampThe source timestamp.

Member Data Documentation

◆ heartbeat_interval

int32_t ddt::DdtConnectionManager::heartbeat_interval
protected

◆ HEARTBEAT_INTERVAL_DEFAULT

const int32_t ddt::DdtConnectionManager::HEARTBEAT_INTERVAL_DEFAULT = 1
protected

◆ heartbeat_timeout

int32_t ddt::DdtConnectionManager::heartbeat_timeout
protected

◆ HEARTBEAT_TIMEOUT_DEFAULT

const int32_t ddt::DdtConnectionManager::HEARTBEAT_TIMEOUT_DEFAULT = 10
protected

◆ registered_publishers

std::set<std::string> ddt::DdtConnectionManager::registered_publishers
protected

◆ reply_time

int32_t ddt::DdtConnectionManager::reply_time
protected

◆ REPLY_TIME_DEFAULT

const int32_t ddt::DdtConnectionManager::REPLY_TIME_DEFAULT = 6
protected

◆ shm_timeout

int32_t ddt::DdtConnectionManager::shm_timeout
protected

◆ SHM_TIMEOUT_DEFAULT

const int32_t ddt::DdtConnectionManager::SHM_TIMEOUT_DEFAULT = 10
protected

◆ statistics_map

std::map<std::string, DdtStatistics> ddt::DdtConnectionManager::statistics_map
protected

data_stream_identifier serves as key

◆ waiting_time

int32_t ddt::DdtConnectionManager::waiting_time
protected

◆ WAITING_TIME_DEFAULT

const int32_t ddt::DdtConnectionManager::WAITING_TIME_DEFAULT = 1000
protected

The documentation for this class was generated from the following files: