Telemetry Subscriber¶
Overview¶
The purpose of the Telemetry Subscriber is to receive samples published from one or more Telemetry Republishers for various DDS Topics, correlate these samples; and then write the correlated samples into a shared memory queue for subsequent consumption by Data Tasks on the same node. Samples from the various DDS topics are correlated based on their sample ID, i.e. DDS samples with the same samples ID are matched together. The payloads for the matched samples are extracted and combined into a single user defined data structure using a user supplied function, called the blender function. Each entry in the shared memory queue therefore contains data sampled at the same loop cycle.
Since the topic data structure for the shared memory queue must be provided by the RTC Toolkit user,
it is not possible to have a fully working pre-compiled and linked executable for the Telemetry
Subscriber.
The RTC Toolkit actually provides a template version of the Telemetry Subscriber in a library.
A working Telemetry Subscriber must be instantiated using the user provided topic structure, and
compiled and linked against the librtctkTelemetrySubscriber.so
library.
The idea is to provide an almost ready application that is a SRTC component and delivers all the boilerplate code for reading from DDS and correlating samples. The user then only needs to concentrate on defining the shared memory topic structure and a blender function that constructs the topic from correlated DDS samples.
What is described in these sections is how such a Telemetry Subscriber needs to be instantiated into an actual working application, how the application can be configured and how it is expected to behave.
Prerequisites¶
The Telemetry Subscriber library has the following external dependencies:
FastDDS - Provides the communication services (DDS implementation) to receive data from a Telemetry Republisher.
ipcq - Provides the shared memory queue into which correlated DDS samples are written for the downstream Data Tasks.
NUMA++ - This utility library is used to optionally control the NUMA node where shared memory is allocated, thread scheduling and pinning policies. This allows runtime performance optimisations.
If the RTC Toolkit has been successfully built with the Waf build system, then the above dependencies would have already been fulfilled.
It is assumed that the installation of the RTC Toolkit is performed into the INTROOT
directory
and that the environment variable INTROOT
is defined (see section Installation).
If this is not the case, any references to INTROOT
in the following sections needs to be
adjusted appropriately.
Although a Telemetry Subscriber is able to start with default settings for FastDDS,
to properly use any Telemetry Subscriber instance together with a Telemetry Republisher,
one should use the dedicated FastDDS XML QoS file supplied by the RTC Toolkit.
This involves pointing FastDDS to the correct XML file by exporting the FASTRTPS_DEFAULT_PROFILES_FILE
environment variable with a file path to the file provided by the RTC Toolkit,
as follows:
export FASTRTPS_DEFAULT_PROFILES_FILE=$INTROOT/resource/config/rtctk/RTCTK_DEFAULT_FASTDDS_QOS_PROFILES.xml
In addition, the Telemetry Subscriber configuration must be prepared so that it uses the correct
profile that is defined in the XML file.
These configuration settings for dds_qos_profile
are indicated in the
Configuration section below.
Customisation¶
The only customisation currently available for a Telemetry Subscriber instance is to specify the shared memory topic type and provide a blender function that will appropriately construct the topic from correlated input DDS samples. It is in fact mandatory to instantiate a Telemetry Subscriber instance, with the user supplied topic and blender, to have a working Telemetry Subscriber. It is also the user’s responsibility to ensure that the topic is the appropriate one, as expected by downstream Data Tasks, and that the blender function assembles the topic in shared memory from the individual DDS samples correctly.
The Blender Function¶
A function used to construct the user defined topic in shared memory must be provided with the following signature:
// Signature as a normal function:
std::error_code Blender(const rtctk::telSub::CorrelatedDataSamplesRef& dds_samples,
MyTopic& shm_sample) noexcept {
// ... user code goes here ...
}
// Signature as a lambda function:
auto blender = [](const rtctk::telSub::CorrelatedDataSamplesRef& dds_samples,
MyTopic& shm_sample) noexcept -> std::error_code {
// ... user code goes here ...
}
Two alternative declarations are shown above, the first using the typical function declaration syntax and the second showing the blender function declared as a lambda. Choosing one style over the other is at the user’s discretion.
The first input argument must be the list of correlated DDS samples.
Refer to the API reference documentation for details about the structure of
CorrelatedDataSamplesRef
.
The second output argument must be the topic structure previously declared by the user. This is a reference to the region is shared memory that the data should be written to.
The return type of the blender function must be an error code.
When the user topic structure has been successfully constructed it must return a zero or empty error
code, indicating success, e.g. return {};
.
In the case that the topic could not be constructed an appropriate error code should be returned.
The suggested error code to use at the moment is std::errc::bad_message
,
to indicate there is something wrong with the format of the DDS samples.
Note
The blender function must not throw any exceptions, which will cause the processing thread to terminate and send the Telemetry Subscriber component to error. All error conditions must be indicated by returning an appropriate error code. If any code is executed that could throw an exception, this must be caught in a try..catch block and converted to an error code instead.
The code for the body of the blender function must be provided by the user,
since it will depend on the input DDS sample topics and the user provided shared memory topic
structure.
Typically this only involves copying the data from the DDS sample buffers to the correct location
within the shared memory.
In the most trivial case this can simply be a call to std::memcpy
.
Note
Since the blender function is executed in the DDS reading thread, which is time critical, minimal amount of work should be performed in the blender function. Only basic sanity checks should be applied to the input data and the minimal code to copy the data into the shared memory topic should be provided. Any complex computation should be done in a Data Task.
Instantiation¶
To actually instantiate a Telemetry Subscriber with a specific user define topic one needs to
prepare a new application.
We assume that the Waf/wtools build system is being used.
Therefore, the minimal wscript
build configuration file to instantiate an application called
myTelSub would be similar to the following:
from wtools.module import declare_cprogram
declare_cprogram(
target="myTelSub",
use="rtctk.reusableComponents.telSub.lib"
)
As can be seen, it only needs to depend on rtctk.reusableComponents.telSub.lib
.
If the user defined shared memory topic is defined in a separate module,
then this additional module would also have to be added to the use
argument.
The entry-point declaration for this example myTelSub application,
i.e. the contents of the main.cpp
file,
should look similar to the following:
#include <rtctk/telSub/main.hpp>
#include "myTopic.hpp"
void RtcComponentMain(rtctk::componentFramework::Args const& args) {
auto blender = [](const rtctk::telSub::CorrelatedDataSamplesRef& dds_samples,
MyTopic& shm_sample) noexcept -> std::error_code {
// ... user code for the blender goes here ...
return {};
};
rtctk::telSub::Main<MyTopic>(args, std::move(blender));
}
Since any Telemetry Subscriber is an SRTC component, we must declare the entry point with
RtcComponentMain
, rather than with just int main(int argc, const char* argv[])
.
We also rely on the template function rtctk::telSub::Main
to perform the setup specific to a
Telemetry Subscriber and enter the processing loop.
Refer to the Customise a Telemetry Subscriber tutorial to see a complete working example of a Telemetry Subscriber instantiation.
Configuration¶
All Telemetry Subscriber components will accept the configuration parameters described in this
section.
These are all read from the Runtime Configuration Repository during initialisation when the
Init
command is received.
The configuration parameters are divided into groups as indicated in the sections below. If a parameter’s configuration path is indicated with bold font in the tables below it is a mandatory parameter. Otherwise it should be treated as an optional parameter.
DDS Parameters¶
Configuration Path |
Type |
Description |
---|---|---|
static/dds_domain_id |
RtcInt32 |
DDS domain identifier to subscribe to. |
static/dds_qos_profile |
RtcString |
(optional) The name of the profile to use from the QoS configuration XML file.
This should normally be set to |
static/dds_interface_white_list |
RtcVectorString |
(optional) list of network interfaces to be used by DDS. |
static/dds_topics |
RtcVectorString |
This is a list of strings indicating the names of the DDS topics to subscribe to. |
static/dds_multicast_addresses |
RtcVectorString |
(optional) list of multicast addresses for DDS corresponding to the DDS topic list. |
Note
The order of the topic names declared in dds_topics
matters.
The Telemetry Subscriber will present the correlated DDS samples to the blender function in the
same order as is encoded in dds_topics
.
Specifically, the dds_samples
argument to the blender function will be filled such that,
dds_samples.samples[0]
will correspond to data from the first DDS topic in the
dds_topics
list, dds_samples.samples[1]
will correspond to data from the second DDS
topic in the dds_topics
list, and so on.
The following is an example of the above configuration parameters in a YAML file:
static:
dds_domain_id:
type: RtcInt32
value: 123
dds_qos_profile:
type: RtcString
value: RtcTk_Default_Profile
dds_topics:
type: RtcVectorString
value:
- SlopesTopic
- IntensitiesTopic
- CommandsTopic
Operational Logic Parameters¶
Configuration Path |
Type |
Description |
---|---|---|
static/close_detach_delay |
RtcInt32 |
Indicates the delay in milliseconds before receiving the signal to stop writing to the shared memory queue and closing, i.e. destroying, the shared memory writer. This provides an opportunity for the shared memory queue readers to detach gracefully. |
static/correlator_poll_timeout |
RtcInt32 |
The time in milliseconds to wait for receiving and correlating DDS input samples, before considering the operation to have timed out. If a timeout occurs, the error will be indicated, but the Telemetry Subscriber component will not enter the error state. |
static/monitor_report_interval |
RtcInt32 |
Interval time in milliseconds used by the monitoring thread, which determines how often the thread wakes up to report errors from the processing thread. |
static/processing_thread_policies |
Defines optional NUMA policies for the processing thread. |
|
static/monitoring_thread_policies |
Defines optional NUMA policies for the monitoring thread. |
The following is an example of the above configuration parameters in a YAML file:
static:
close_detach_delay:
type: RtcInt32
value: 1000
correlator_poll_timeout:
type: RtcInt32
value: 200
monitor_report_interval:
type: RtcInt32
value: 1000
processing_thread_policies:
cpu_affinity:
type: RtcString
value: "1-4"
scheduler_policy:
type: RtcString
value: Fifo
scheduler_priority:
type: RtcInt32
value: 10
memory_policy_mode:
type: RtcString
value: Bind
memory_policy_nodes:
type: RtcString
value: "1-4"
monitoring_thread_policies:
# Monitoring thread is executed with lower priority
scheduler_policy:
type: RtcString
value: Other
scheduler_priority:
type: RtcInt32
value: -1
Commands¶
All Telemetry Subscriber instances can be steered by sending commands to them via their MAL
interface.
One can either rely on the RTC Supervisor to send the appropriate commands or use the
rtctkClient
command line application to send individual commands manually.
The set of commands that is currently available for a Telemetry Subscriber is indicated in the following table, together with the expected behaviour:
Command |
Behaviour |
---|---|
Init |
Constructs the necessary DDS object to read DDS input samples; constructs the shared memory writer; and spawns the low level processing and monitoring threads. |
Stop |
This will stop the initialisation procedure started by the Init command. |
Reset |
Resets the state of the component to the state before the Init command was received. This means that all DDS object are destroyed, shared memory is released, and the processing and monitoring threads are also destroyed. |
Enable |
Enables reading of the DDS samples. Any read errors and timeouts are ignored. |
Disable |
Disables reading of the DDS samples. Any read errors and timeouts are ignored. |
Run |
Request transition to On.Operational.Run which:
Errors are counted and reported by monitoring thread, but attempts to correlate will continue. Note If correlation fails because of bad data samples (wrong sample id) this must be recovered by transitioning back to Idle then Run again to restart correlation from new data samples. |
Idle |
Requests transition to On.Operational.Idle which:
|
Errors¶
Not Operational¶
The main errors that can currently occur during component initialisation when it receives the
Init
command are due to missing mandatory configuration parameters.
The following is an example of the log message for such an error when dds_topics
is missing:
[13:52:21:730][ERROR][tel_sub_1] Failed to load configuration from the Runtime Repository: Path '/tel_sub_1/static/dds_topics' does not exist.
[13:52:21:730][ERROR][tel_sub_1] Activity.Initialising: failed, exception: Path '/tel_sub_1/static/dds_topics' does not exist.
For cases where the wrong data type was used in a type
field in the YAML file,
the following example error message would be seen in the logs:
[00:02:49:597][ERROR][tel_sub_1] Activity.Initialising: failed, exception: Wrong type used to read data point.
If the YAML file format is not correct, e.g. a value
field has the wrong format,
the following example error message would be seen in the logs:
[00:03:54:061][ERROR][tel_sub_1] Activity.Initialising: failed, exception: File '/home/eltdev/test_install/run/exampleTelSub/runtime_repo/tel_sub_1.yaml' has an invalid data format.
Operational¶
While the Telemetry Subscriber is running the main sources of errors are:
DDS subscription
Problem with received data samples.
If timeout errors occurs it may indicate that the DDS publisher is no longer working, or the timeout threshold is too low for the sample rate and needs to be adjusted (see Operational Logic Parameters). In such cases, the following error messages will be seen in the logs:
[13:49:59:480][ERROR][tel_sub_1] Detected errors in operational logic. [Last error code = 110: Connection timed out. Total number of errors = 1]
If data is received but cannot be correlated because the sample id is wrong it is indicated as protocol errors. If component is in state Idle errors will cause correlation to be restarted automatically. If component is in state Run the correlation must be recovered by transitioning from Run to Idle and then Run again. Protocol errors show up in the logs as the following:
[11:37:27:388][ERROR][tel_sub_1] Detected errors in operational logic. [Last error code = 71: Protocol error. Total number of errors = 43]
For any other failures during reading of the DDS samples or when writing to the shared memory queue, messages about “Detected errors in operational logic” will be logged with the error code received from the sub-system, similar to the following:
[13:48:44:448][ERROR][tel_sub_1] Detected errors in operational logic. [Last error code = 74: Bad message. Total number of errors = 1]
Limitations and Known Issues¶
Very limited monitoring is currently available for the Telemetry Subscriber. It does not publish any runtime statistics to the OLDB.