Telemetry Subscriber

Overview

The purpose of the Telemetry Subscriber is to receive samples published from one or more Telemetry Republishers for various DDS Topics, correlate these samples; and then write the correlated samples into a shared memory queue for subsequent consumption by Data Tasks on the same node. Samples from the various DDS topics are correlated based on their sample ID, i.e. DDS samples with the same samples ID are matched together. The payloads for the matched samples are extracted and combined into a single user defined data structure using a user supplied function, called the blender function. Each entry in the shared memory queue therefore contains data sampled at the same loop cycle.

Since the topic data structure for the shared memory queue must be provided by the RTC Toolkit user, it is not possible to have a fully working pre-compiled and linked executable for the Telemetry Subscriber. The RTC Toolkit actually provides a template version of the Telemetry Subscriber in a library. A working Telemetry Subscriber must be instantiated using the user provided topic structure, and compiled and linked against the librtctkTelemetrySubscriber.so library.

The idea is to provide an almost ready application that is a SRTC component and delivers all the boilerplate code for reading from DDS and correlating samples. The user then only needs to concentrate on defining the shared memory topic structure and a blender function that constructs the topic from correlated DDS samples.

What is described in these sections is how such a Telemetry Subscriber needs to be instantiated into an actual working application, how the application can be configured and how it is expected to behave.

Prerequisites

The Telemetry Subscriber library has the following external dependencies:

  • FastDDS - Provides the communication services (DDS implementation) to receive data from a Telemetry Republisher.

  • ipcq - Provides the shared memory queue into which correlated DDS samples are written for the downstream Data Tasks.

  • NUMA++ - This utility library is used to optionally control the NUMA node where shared memory is allocated, thread scheduling and pinning policies. This allows runtime performance optimisations.

If the RTC Toolkit has been successfully built with the Waf build system, then the above dependencies would have already been fulfilled.

It is assumed that the installation of the RTC Toolkit is performed into the INTROOT directory and that the environment variable INTROOT is defined (see section Installation). If this is not the case, any references to INTROOT in the following sections needs to be adjusted appropriately.

Although a Telemetry Subscriber is able to start with default settings for FastDDS, to properly use any Telemetry Subscriber instance together with a Telemetry Republisher, one should use the dedicated FastDDS XML QoS file supplied by the RTC Toolkit. This involves pointing FastDDS to the correct XML file by exporting the FASTRTPS_DEFAULT_PROFILES_FILE environment variable with a file path to the file provided by the RTC Toolkit, as follows:

export FASTRTPS_DEFAULT_PROFILES_FILE=$INTROOT/resource/config/rtctk/RTCTK_DEFAULT_FASTDDS_QOS_PROFILES.xml

In addition, the Telemetry Subscriber configuration must be prepared so that it uses the correct profile that is defined in the XML file. These configuration settings for dds_qos_profile are indicated in the Configuration section below.

Customisation

The only customisation currently available for a Telemetry Subscriber instance is to specify the shared memory topic type and provide a blender function that will appropriately construct the topic from correlated input DDS samples. It is in fact mandatory to instantiate a Telemetry Subscriber instance, with the user supplied topic and blender, to have a working Telemetry Subscriber. It is also the user’s responsibility to ensure that the topic is the appropriate one, as expected by downstream Data Tasks, and that the blender function assembles the topic in shared memory from the individual DDS samples correctly.

Shared Memory Queue Topic

A user has almost full freedom to define the topic structure. It is primarily driven by the input data format requirements of the Data Tasks. Though it is suggested to at least include the sample ID so that this information is propagated downstream.

A simple example for a topic structure could be as follows:

struct MyTopic {
    uint64_t sample_id;
    float vector_data[1024];
};

Note

The topic structure must be a continuous flat structure in memory. It cannot have pointers to other memory blocks. This will not be readable in the Data Tasks and most likely cause segmentation violations or corruption. Any complex structure with pointers to other memory regions will have to be serialised into a flat topic structure.

Ideally this should be declared in a header file that is shared with the corresponding Data Task that will process the topic’s data.

The Blender Function

A function used to construct the user defined topic in shared memory must be provided with the following signature:

// Signature as a normal function:
std::error_code Blender(const rtctk::telSub::CorrelatedDataSamplesRef& dds_samples,
                        MyTopic& shm_sample) noexcept {
    // ... user code goes here ...
}

// Signature as a lambda function:
auto blender = [](const rtctk::telSub::CorrelatedDataSamplesRef& dds_samples,
                  MyTopic& shm_sample) noexcept -> std::error_code {
    // ... user code goes here ...
}

Two alternative declarations are shown above, the first using the typical function declaration syntax and the second showing the blender function declared as a lambda. Choosing one style over the other is at the user’s discretion.

The first input argument must be the list of correlated DDS samples. Refer to the API reference documentation for details about the structure of CorrelatedDataSamplesRef.

The second output argument must be the topic structure previously declared by the user. This is a reference to the region is shared memory that the data should be written to.

The return type of the blender function must be an error code. When the user topic structure has been successfully constructed it must return a zero or empty error code, indicating success, e.g. return {};. In the case that the topic could not be constructed an appropriate error code should be returned. The suggested error code to use at the moment is std::errc::bad_message, to indicate there is something wrong with the format of the DDS samples.

Note

The blender function must not throw any exceptions, which will cause the processing thread to terminate and send the Telemetry Subscriber component to error. All error conditions must be indicated by returning an appropriate error code. If any code is executed that could throw an exception, this must be caught in a try..catch block and converted to an error code instead.

The code for the body of the blender function must be provided by the user, since it will depend on the input DDS sample topics and the user provided shared memory topic structure. Typically this only involves copying the data from the DDS sample buffers to the correct location within the shared memory. In the most trivial case this can simply be a call to std::memcpy.

Note

Since the blender function is executed in the DDS reading thread, which is time critical, minimal amount of work should be performed in the blender function. Only basic sanity checks should be applied to the input data and the minimal code to copy the data into the shared memory topic should be provided. Any complex computation should be done in a Data Task.

Instantiation

To actually instantiate a Telemetry Subscriber with a specific user define topic one needs to prepare a new application. We assume that the Waf/wtools build system is being used. Therefore, the minimal wscript build configuration file to instantiate an application called myTelSub would be similar to the following:

from wtools.module import declare_cprogram

declare_cprogram(
    target="myTelSub",
    use="rtctk.reusableComponents.telSub.lib"
)

As can be seen, it only needs to depend on rtctk.reusableComponents.telSub.lib. If the user defined shared memory topic is defined in a separate module, then this additional module would also have to be added to the use argument.

The entry-point declaration for this example myTelSub application, i.e. the contents of the main.cpp file, should look similar to the following:

#include <rtctk/telSub/main.hpp>
#include "myTopic.hpp"

void RtcComponentMain(rtctk::componentFramework::Args const& args) {
    auto blender = [](const rtctk::telSub::CorrelatedDataSamplesRef& dds_samples,
                      MyTopic& shm_sample) noexcept -> std::error_code {
        // ... user code for the blender goes here ...
        return {};
    };
    rtctk::telSub::Main<MyTopic>(args, std::move(blender));
}

Since any Telemetry Subscriber is an SRTC component, we must declare the entry point with RtcComponentMain, rather than with just int main(int argc, const char* argv[]). We also rely on the template function rtctk::telSub::Main to perform the setup specific to a Telemetry Subscriber and enter the processing loop.

Refer to the Customise a Telemetry Subscriber tutorial to see a complete working example of a Telemetry Subscriber instantiation.

Configuration

All Telemetry Subscriber components will accept the configuration parameters described in this section. These are all read from the Runtime Configuration Repository during initialisation when the Init command is received.

The configuration parameters are divided into groups as indicated in the sections below. If a parameter’s configuration path is indicated with bold font in the tables below it is a mandatory parameter. Otherwise it should be treated as an optional parameter.

DDS Parameters

Configuration Path

Type

Description

static/dds_domain_id

RtcInt32

DDS domain identifier to subscribe to.

static/dds_qos_profile

RtcString

(optional) The name of the profile to use from the QoS configuration XML file. This should normally be set to RtcTk_Default_Profile. If an empty string is given then the default FastDDS QoS parameters are used.

static/dds_interface_white_list

RtcVectorString

(optional) list of network interfaces to be used by DDS.

static/dds_topics

RtcVectorString

This is a list of strings indicating the names of the DDS topics to subscribe to.

static/dds_multicast_addresses

RtcVectorString

(optional) list of multicast addresses for DDS corresponding to the DDS topic list.

Note

The order of the topic names declared in dds_topics matters. The Telemetry Subscriber will present the correlated DDS samples to the blender function in the same order as is encoded in dds_topics. Specifically, the dds_samples argument to the blender function will be filled such that, dds_samples.samples[0] will correspond to data from the first DDS topic in the dds_topics list, dds_samples.samples[1] will correspond to data from the second DDS topic in the dds_topics list, and so on.

The following is an example of the above configuration parameters in a YAML file:

static:
    dds_domain_id:
        type: RtcInt32
        value: 123
    dds_qos_profile:
        type: RtcString
        value: RtcTk_Default_Profile
    dds_topics:
        type: RtcVectorString
        value:
            - SlopesTopic
            - IntensitiesTopic
            - CommandsTopic

Shared Memory Queue Parameters

Configuration Path

Type

Description

static/shm_topic_name

RtcString

The name of the shared memory queue, i.e. this corresponds to the file name /dev/shm/ipcq-<topic-name>.

static/shm_capacity

RtcInt64

Maximum number of samples that can be stored in the shared memory queue.

static/shm_memory_policy_mode

RtcString

This is the memory allocation policy to apply to the shared memory. It can be one of the values indicated in Table 2.

static/shm_memory_policy_nodes

RtcString

Indicates a mask of NUMA nodes to which the memory policy is applied. See the numa_parse_nodestring function in the numa(3) manpage for details about the correct format of this string.

The following is an example of the above configuration parameters in a YAML file:

static:
    shm_topic_name:
        type: RtcString
        value: mytopic
    shm_capacity:
        type: RtcInt64
        value: 1024
    shm_memory_policy_mode:
        type: RtcString
        value: Bind
    shm_memory_policy_nodes:
        type: RtcString
        value: "1,2"

Operational Logic Parameters

Configuration Path

Type

Description

static/close_detach_delay

RtcInt32

Indicates the delay in milliseconds before receiving the signal to stop writing to the shared memory queue and closing, i.e. destroying, the shared memory writer. This provides an opportunity for the shared memory queue readers to detach gracefully.

static/correlator_poll_timeout

RtcInt32

The time in milliseconds to wait for receiving and correlating DDS input samples, before considering the operation to have timed out. If a timeout occurs, the error will be indicated, but the Telemetry Subscriber component will not enter the error state.

static/monitor_report_interval

RtcInt32

Interval time in milliseconds used by the monitoring thread, which determines how often the thread wakes up to report errors from the processing thread.

static/processing_thread_policies

NUMA Policies

Defines optional NUMA policies for the processing thread.

static/monitoring_thread_policies

NUMA Policies

Defines optional NUMA policies for the monitoring thread.

The following is an example of the above configuration parameters in a YAML file:

static:
    close_detach_delay:
        type: RtcInt32
        value: 1000
    correlator_poll_timeout:
        type: RtcInt32
        value: 200
    monitor_report_interval:
        type: RtcInt32
        value: 1000
    processing_thread_policies:
        cpu_affinity:
            type: RtcString
            value: "1-4"
        scheduler_policy:
            type: RtcString
            value: Fifo
        scheduler_priority:
            type: RtcInt32
            value: 10
        memory_policy_mode:
            type: RtcString
            value: Bind
        memory_policy_nodes:
            type: RtcString
            value: "1-4"
    monitoring_thread_policies:
        # Monitoring thread is executed with lower priority
        scheduler_policy:
            type: RtcString
            value: Other
        scheduler_priority:
            type: RtcInt32
            value: -1

Commands

All Telemetry Subscriber instances can be steered by sending commands to them via their MAL interface. One can either rely on the RTC Supervisor to send the appropriate commands or use the rtctkClient command line application to send individual commands manually.

The set of commands that is currently available for a Telemetry Subscriber is indicated in the following table, together with the expected behaviour:

Command

Behaviour

Init

Constructs the necessary DDS object to read DDS input samples; constructs the shared memory writer; and spawns the low level processing and monitoring threads.

Stop

This will stop the initialisation procedure started by the Init command.

Reset

Resets the state of the component to the state before the Init command was received. This means that all DDS object are destroyed, shared memory is released, and the processing and monitoring threads are also destroyed.

Enable

Enables reading of the DDS samples. Any read errors and timeouts are ignored.

Disable

Disables reading of the DDS samples. Any read errors and timeouts are ignored.

Run

Request transition to On.Operational.Run which:

  • Turns on writing to the shared memory queue.

  • Clears error counters and last error in transition to Run.

Errors are counted and reported by monitoring thread, but attempts to correlate will continue.

Note

If correlation fails because of bad data samples (wrong sample id) this must be recovered by transitioning back to Idle then Run again to restart correlation from new data samples.

Idle

Requests transition to On.Operational.Idle which:

  • Turns off writing to the shared memory queue.

  • Ignores errors and monitoring thread will stop logging.

  • Restarts correlation with new data samples if bad data samples are encountered automatically.

Errors

Not Operational

The main errors that can currently occur during component initialisation when it receives the Init command are due to missing mandatory configuration parameters. The following is an example of the log message for such an error when dds_topics is missing:

[13:52:21:730][ERROR][tel_sub_1] Failed to load configuration from the Runtime Repository: Path '/tel_sub_1/static/dds_topics' does not exist.
[13:52:21:730][ERROR][tel_sub_1] Activity.Initialising: failed, exception: Path '/tel_sub_1/static/dds_topics' does not exist.

For cases where the wrong data type was used in a type field in the YAML file, the following example error message would be seen in the logs:

[00:02:49:597][ERROR][tel_sub_1] Activity.Initialising: failed, exception: Wrong type used to read data point.

If the YAML file format is not correct, e.g. a value field has the wrong format, the following example error message would be seen in the logs:

[00:03:54:061][ERROR][tel_sub_1] Activity.Initialising: failed, exception: File '/home/eltdev/test_install/run/exampleTelSub/runtime_repo/tel_sub_1.yaml' has an invalid data format.

Operational

While the Telemetry Subscriber is running the main sources of errors are:

  • DDS subscription

  • Problem with received data samples.

If timeout errors occurs it may indicate that the DDS publisher is no longer working, or the timeout threshold is too low for the sample rate and needs to be adjusted (see Operational Logic Parameters). In such cases, the following error messages will be seen in the logs:

[13:49:59:480][ERROR][tel_sub_1] Detected errors in operational logic. [Last error code = 110: Connection timed out. Total number of errors = 1]

If data is received but cannot be correlated because the sample id is wrong it is indicated as protocol errors. If component is in state Idle errors will cause correlation to be restarted automatically. If component is in state Run the correlation must be recovered by transitioning from Run to Idle and then Run again. Protocol errors show up in the logs as the following:

[11:37:27:388][ERROR][tel_sub_1] Detected errors in operational logic. [Last error code = 71: Protocol error. Total number of errors = 43]

For any other failures during reading of the DDS samples or when writing to the shared memory queue, messages about “Detected errors in operational logic” will be logged with the error code received from the sub-system, similar to the following:

[13:48:44:448][ERROR][tel_sub_1] Detected errors in operational logic. [Last error code = 74: Bad message. Total number of errors = 1]

Limitations and Known Issues

Very limited monitoring is currently available for the Telemetry Subscriber. It does not publish any runtime statistics to the OLDB.