ddt
0.1
|
#include <ddtConnectionManager.hpp>
Public Member Functions | |
DdtConnectionManager (DdtLogger *ddt_logger, const std::string uri_string, const std::string config) | |
DdtConnectionManager (DdtMemoryManager *const mmgr, DdtLogger *ddt_logger, const std::string uri_string, const std::string config) | |
~DdtConnectionManager () override | |
int32_t | RegisterPublisher (const std::string &data_stream_identifier, const int32_t latency, int32_t deadline, const int32_t max_data_sample_size, const int32_t number_of_samples, const bool compute_checksum, const std::string &publishing_uri) override |
int32_t | UnregisterPublisher (const std::string &data_stream_identifier) override |
int32_t | UnregisterPublishers () |
void | PublishData (const std::string &data_stream_identifier) override |
int32_t | RegisterSubscriber (const std::string &subscriber_uuid, const std::string &data_stream_identifier, const std::string &remote_broker_uri, const int32_t latency, const int32_t deadline) override |
int32_t | RegisterRemoteSubscriber (const std::string &remote_broker, const std::string &subscriber_uuid, const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline) override |
int32_t | UnregisterSubscriber (const std::string &data_stream_identifier, const std::string &subscriber_uuid) override |
int32_t | UnregisterSubscribers () |
int32_t | get_max_data_sample_size (const std::string &data_stream_identifier) override |
int32_t | get_number_of_samples (const std::string &data_stream_identifier) override |
std::string | get_publishing_uri (const std::string &data_stream_identifier) override |
int32_t | get_notification_port (const std::string &data_stream_identifier) override |
std::vector< std::string > | get_statistics (const std::string &data_stream_identifier) override |
int32_t | get_heartbeat_interval () override |
int32_t | get_heartbeat_timeout () override |
bool | get_compute_checksum (const std::string &data_stream_identifier) override |
std::string | get_shm_id (const std::string &data_stream_identifier) override |
void | UpdateHeartbeat (const std::string &identifier) override |
bool | CheckPubRegistrationPermitted (const std::string &data_stream_identifier) override |
bool | CheckPublisherExists (const std::string &data_stream_identifier) override |
bool | CheckRemotePublisherExists (const std::string &remote_broker_uri, const std::string &data_stream_identifier) override |
void | UpdateStatistics (const std::string &data_stream_identifier, const int32_t datavec_size, const uint64_t source_timestamp) override |
int32_t | GetMaxPossibleBufferSize (int32_t max_data_sample_size) override |
std::vector< std::string > | GetRegisteredStreams () override |
std::vector< std::string > | GetConnectedBrokers () override |
Protected Member Functions | |
void | LoadDefaults () |
std::string | GetConfigPath () const |
void | ReadIni () |
std::string | CreateSubscriptionUri (const std::string publishing_uri, const std::string remote_broker_uri) const |
bool | CheckStreamIdInUse (const std::string &data_stream_identifier) |
void | CreateStatistics (const std::string &data_stream_identifier, const int32_t number_of_samples) |
void | ResetStatistics (const std::string &data_stream_identifier) |
Protected Attributes | |
const int32_t | SHM_TIMEOUT_DEFAULT = 10 |
const int32_t | WAITING_TIME_DEFAULT = 1000 |
const int32_t | REPLY_TIME_DEFAULT = 6 |
const int32_t | HEARTBEAT_INTERVAL_DEFAULT = 1 |
const int32_t | HEARTBEAT_TIMEOUT_DEFAULT = 10 |
int32_t | shm_timeout |
int32_t | waiting_time |
int32_t | reply_time |
int32_t | heartbeat_interval |
int32_t | heartbeat_timeout |
std::set< std::string > | registered_publishers |
std::map< std::string, DdtStatistics > | statistics_map |
This class manages the connection handling between Data Brokers and Publisher / Subscriber applications.
|
explicit |
Constructor
|
explicit |
Constructor
|
override |
Destructor
|
override |
Checks if a publisher with the specified data stream identifier exists.
|
override |
Checks if a publisher is permitted for registration. A publisher is rejected if its stream identifier is already in use, except heartbeats are missing. Then a publisher is allowed to reregister.
|
override |
Check if remote publisher for specified stream id exists.
|
protected |
This function checks if the stream identifier is already in use. This will prevent a publisher using the same stream identifier as a subscriber that has subscribed to a remote publisher.
data_stream_identifier | The data stream identifier. |
|
protected |
Inserts a DdtStatistics object into a map. This is done for each data stream.
data_stream_identifier | The data stream identifier. |
number_of_samples | The queue capacity. |
Create a map for the statistics. The statistics are bound to the lifetime of the shared memory. In case a publisher is restarted within the shm timeout while subscribers are still alive, the statistics are not reset.
|
protected |
Creates the subscription uri.
Creates the subscription URI. Therefore, it extracts the port and the path element from the publishing uri and takes the ip address from the remote broker uri. Example: publishing_uri: zpb.ps://222.22.222.222:5100/ds1 remote_broker_uri: zpb.rr://111.11.111.111:5001/broker/Broker1 resulting subscription_uri: zpb.ps://111.11.111.111:5100/ds1
|
override |
Returns compute_checksum from the memory accessor.
|
override |
Returns heartbeat_interval.
|
override |
Returns heartbeat_timeout.
|
override |
Returns max_data_sample_size.
|
override |
Returns notification_port.
|
override |
Returns number_of_samples.
|
override |
Returns publishing_uri.
|
override |
Returns the shared memory identifier.
|
override |
Returns the statistics.
|
protected |
This function reads the environment variable DDT_TRANSFERCONFIG_PATH and returns the path of the configuration file.
|
override |
Returns the URIs of all connected remote brokers.
|
override |
Requests the max possible buffer size (computed by the memory manager).
max_data_sample_size | The maximum data sample size. |
|
override |
Returns all registered data streams
|
protected |
Loads default values for configuration parameters.
|
override |
This function
if there is at least one remote subscriber, read the packet from shared memory and publish it over the network
|
protected |
Reads the databroker configuration file.
An exception is thrown if the config file does not exist. In that case the default values are used instead.
make sure shm_timeout is at least SHM_TIMEOUT_MIN set to default value otherwise
make sure waiting_time is at least WAITING_TIME_MIN set to default value otherwise
make sure reply_time is at least REPLY_TIME_MIN set to default value otherwise
make sure heartbeat_interval is at least HEARTBEAT_INTERVAL_MIN set to default value otherwise
make sure heartbeat_timeout is at least HEARTBEAT_TIMEOUT_MIN set to default value otherwise
make sure heartbeat_timeout is greater than reply_time set to default value otherwise
|
override |
This function is called from Publisher applications by the DdtDataPublishers. For each Publisher application a DdtDataProducer object is created which are stored in a producer map. This also triggers the registration process in the DdtMemoryManager.
Reuse the publishing URI if the publisher was restarted.
It takes some time to establish communication mechanism on startup of the publishers and subscribers (see MAL API description). Wait after publisher has been instantiated. This does NOT guarantee that the first n message will not be lost.
|
override |
This function is called from remote brokers to register a remote subscriber. This triggers the registration process in the DdtMemoryManager and creates a new DdtDataConsumer object.
|
override |
Registers Subscriber applications. Distinguishes between local and remote subscribers. if local:
Check if the remote broker uri was specified register as local subscriber if this is not the case
|
protected |
Resets / deletes the statistics in case the shared memory is deleted.
data_stream_identifier | The data stream identifier. |
|
override |
This function stops the memory reader, unregisters the Publisher application from the DdtMemoryManager and deletes the DdtDataProducer.
int32_t DdtConnectionManager::UnregisterPublishers | ( | ) |
Unregisters all publishers.
go through the list of all publishers and unregister each of them
|
override |
Unregisters subscribers from their brokers. To unregister from remote brokers a MAL client is created. If it is the last subscriber of a data stream:
Set publishing uri to "" if there are no remote subscribers.
int32_t DdtConnectionManager::UnregisterSubscribers | ( | ) |
Unregisters all subscribers.
go through the list of all subscribers and unregister each of them
|
override |
Updates the timestamp in the client map.
|
override |
Updates the statistic counters
data_stream_identifier | The data stream identifier. |
datavec_size | The size of the image data. |
source_timestamp | The source timestamp. |
|
protected |
|
protected |
|
protected |
|
protected |
|
protected |
|
protected |
|
protected |
|
protected |
|
protected |
|
protected |
data_stream_identifier serves as key
|
protected |
|
protected |