Customise a Telemetry Subscriber

This tutorial section takes a user through an example of running a Telemetry Subscriber standalone. See Putting Everything Together for a Minimalistic SRTC System for an example of how it fits together with other SRTC components. Afterwards, we show how to instantiate a customised version of Telemetry Subscriber with an end user blender function.

Prerequisites

It is assumed that the RTC Toolkit is built and installed. Refer to the Installation section for details. The PATH environment variable should also be setup to include the path to the bin/ directory where the RTC Toolkit executables and scripts are installed.

Running the Example

A working example of a Telemetry Subscriber instance is installed with the RTC Toolkit called rtctkExampleTelSub. The corresponding source code is found in the RTC Toolkit repository under _examples/exampleTelSub/app. To simplify running the example, a launch script has been prepared called rtctkExampleTelSub.sh and should also be available in the installation bin/ directory.

Initial Run

To get going quickly with the minimum number of steps, open a new terminal window and execute the following command:

rtctkExampleTelSub.sh run

This will perform all necessary steps to start the rtctkExampleTelSub component and bring it to the running state. This may take a few seconds or a minute to complete. You will see log messages from the component and various tools such as rtctkGenDdsPub, used to generate sample DDS data, and rtctkClient, used to send commands to rtctkExampleTelSub.

Note

The rtctkClient application has a bug in the shutdown phase that causes it to abort. Unfortunately this has not been resolved yet, and you may see such messages in the terminal output. These can be safely ignored for the time being and will be fixed in future releases of the RTC Toolkit.

Note

When executing the rtctkExampleTelSub.sh run command the logging output from the background processes is sent to the same terminal that the command was invoked in. It is suggested to execute any further commands such as rtctkExampleTelSub.sh send ... in a new separate terminal. Otherwise the output will be interspersed and difficult to interact with the example.

We now want to confirm that rtctkExampleTelSub is actually writing something to the shared memory queue. This can be done by opening up a second terminal window and executing the following command:

rtctkExampleShmSub --queue-name scao_loop

This runs a simple shared memory spy utility that reads from the shared memory queue in a loop and prints the rate. An example of the output from this command is indicated below:

Read 10 samples.    Rate = 10 Hz
Read 19 samples.    Rate = 9 Hz
Read 29 samples.    Rate = 10 Hz
Read 39 samples.    Rate = 10 Hz
...

To stop the rtctkExampleShmSub spy utility press Ctrl-C or send the process the SIGTERM signal.

Note

The rtctkExampleShmSub utility is not fully generic, i.e. it can only be used to read shared memory topics it was compiled with, which is currently rtctk::exampleTopic::ScaoLoopTopic found in _examples/exampleTopics/src/include/rtctk/exampleTopics/topics.hpp.

At this point, stop the current execution and cleanup by running the following commands:

rtctkExampleTelSub.sh stop
rtctkExampleTelSub.sh undeploy

We will now go through a step-by-step procedure for getting rtctkExampleTelSub up and running in the following sections. This should give a better understanding of starting, steering, checking and stopping the rtctkExampleTelSub component.

Deployment

The rtctkExampleTelSub application is a SRTC component. Therefore it reads its configuration from the Runtime Configuration Repository. We currently simulate the Runtime Configuration Repository with a directory of YAML files. Normally the Runtime Configuration Repository would be populated during the deployment/initialisation phase of the RTC system. However, for now this is simulated by copying YAML files specially prepared for this example and substituting any environment variables that they may contain.

These steps are encoded in the launch script and can be executed with the following command:

rtctkExampleTelSub.sh deploy

The configuration is prepared and deployed under the run/ directory. Specifically, into the $INTROOT/run/exampleTelSub/ directory, if INTROOT was used as the build prefix. You may need to adjust the commands indicated in the subsequent sections where you see $INTROOT if this was not used as the installation destination for the RTC Toolkit.

Start the Component

After the configuration for the component is deployed, one can start it with the following command:

rtctkExampleTelSub.sh start

This will actually start the following two processes:

  • rtctkExampleTelSub - The Telemetry Subscriber component itself.

  • rtctkGenDdsPub - A standalone tool for publishing test data to DDS.

The DDS publisher utility begins to publish data immediately. The rtctkExampleTelSub.sh script is written to make rtctkGenDdsPub publish at a rate of 10Hz. The rtctkExampleTelSub component will however be left in the non-operational and not-ready state.

Launching of the rtctkExampleTelSub component’s process is effectively equivalent to executing the following command:

rtctkExampleTelSub tel_sub_1 "file:$INTROOT/run/exampleTelSub/service_disc.yaml" &

The first mandatory positional argument is the name of the component instance, which must be unique in the system. It is used to identify the configuration parameters to use for the component within the Runtime Configuration Repository. The second mandatory positional argument is the URI to the service discovery, which the component communicates with to identify where the Runtime Configuration Repository actually is. One can think of this as a boot strapping mechanism to identify and setup connections to the needed services, such as the Runtime Configuration Repository and OLDB. For now, the service discovery is being simulated by a YAML file on disk.

Send State Steering Commands

Once the component process is executing we need to steer it to the running state for it to actually publish samples to the shared memory queue. Normally this is done by the RTC Supervisor. But in this example we are running rtctkExampleTelSub standalone. Therefore we need to send the appropriate steering commands manually.

The following commands need to be sent to rtctkExampleTelSub in the given order:

  1. Init - This triggers loading of the configuration from the Runtime Configuration Repository and construction of the various internal objects, such as the DDS readers, shared memory writer, and dedicated processing and monitoring threads.

  2. Enable - Sets the Telemetry Subscriber to start reading samples from DDS, but it will not write these to the shared memory queue. Any DDS read errors are ignored.

  3. Run - Sets the Telemetry Subscriber to write any received and correlated samples from DDS to the shared memory queue. Errors are not ignored at this point.

The steering commands can be sent to the component as follows:

rtctkExampleTelSub.sh send Init
rtctkExampleTelSub.sh send Enable
rtctkExampleTelSub.sh send Run

Under the hood rtctkExampleTelSub.sh is using the rtctkClient tool to communicate with the component’s MAL interface. Therefore the same effect can be achieved by calling rtctkClient directly. For example, for the Init command:

rtctkClient tel_sub_1 Init --sde "file:$INTROOT/run/exampleTelSub/service_disc.yaml"

The first positional argument is the name of the specific component instance of the command should be sent to. This must correspond to the name used when starting the rtctkExampleTelSub process.

Checking the State

It is useful to check the current state of the component. State information is automatically published to the OLDB by every Telemetry Subscriber component and can be queried from there. At the moment the OLDB is being simulated by a simple YAML file on local disk and no generic tool yet exists to conveniently read the datapoints that it contains. However, since the human readable YAML file format is being used, any text editor can be used to easily inspect the contents. Alternatively one can just cat the contents to terminal output.

Nevertheless, to make it as easy as possible in this example, the rtctkExampleTelSub.sh script has an option to parse the OLDB YAML file and print the state. Run the command as follows to query the state of the tel_sub_1 Telemetry Subscriber component:

rtctkExampleTelSub.sh state

This will print one of the following strings, depending on the current state the component is in:

  • Off - Indicates that the rtctkExampleTelSub process is not running.

  • On.NotOperational.NotReady - The component is running but not yet initialised.

  • On.NotOperational.Ready - The component is initialised but not enabled.

  • On.Operational.Idle On.Operational.Update.Idle - The component is enabled but not running.

  • On.Operational.Running On.Operational.Update.Idle - The component is running and writing any samples received and correlated from DDS to the shared memory queue.

Other strings are also possible for various other sub-states. However, the above are the primary states the component should end up in after sending one of the steering commands.

Note

It may be convenient to execute the following command in a separate terminal to monitor the state changes as the steering commands are sent to the component: watch -n 1 rtctkExampleTelSub.sh state

Checking the Output

When rtctkExampleTelSub is in the running state and no errors are occurring, there will not be much logging output in the terminal. To allow us to confirm that data samples are being written to the shared memory queue, one should attach a subscriber directly to the queue and read the contents. The command for this is as follows:

rtctkExampleShmSub -q scao_loop

This will print the rate of samples being read, in an endless loop by default. To stop rtctkExampleShmSub in this case, Ctrl-C can be used or the SIGTERM signal must be sent to it.

Note

You may from time to time notice the message Note: SHM reader state reset. when using rtctkExampleShmSub. This is normal if attaching to the shared memory queue late, i.e. after the Telemetry Subscriber already started writing to the queue. The message can be safely ignored in this case.

It is also possible to have rtctkExampleShmSub print a dump of each sample read to the terminal by adding the -p | --print-samples option as follows:

rtctkExampleShmSub -q scao_loop -p

The -p | --print-samples option prints a short summary version of the data. To print a full listing, one can use the -l | --print-long option instead.

It is also possible to write the raw buffer of the sample data to file. Use the -f | --file option for this case and provide a file name to write to. The file name will actually be appended with a number indicating the running count of samples read from the queue. As an example, the following command will dump the raw buffers to file:

rtctkExampleShmSub -q scao_loop -f sample.raw -m 3

This will produce the following files in the current working directory:

sample1.raw
sample2.raw
sample3.raw

The option -m | --max-samples was also used to limit the number of samples to record. Without the option, rtctkExampleShmSub would record samples in and endless loop until terminated.

Remember that the contents of the recorded sample files is a raw binary dump of the memory contents. Therefore an appropriate tool needs to be used to interpret the file. Without a more specific tool available, one can always use hexdump as follows:

hexdump sample1.raw

Stopping and Cleanup

To stop the rtctkExampleTelSub component cleanly one should first steer it to the not-ready state. Assuming we already brought the component to the running state, we need to first take it from running to idle, then down to not-ready. This is done by executing the following commands in the indicated order:

rtctkExampleTelSub.sh send Idle
rtctkExampleTelSub.sh send Disable
rtctkExampleTelSub.sh send Reset

Once the component is reset, it would have destroyed its internal DDS reader and shared memory writer objects, and completely stopped reading from DDS.

The rtctkExampleTelSub process will still be running however. To terminate the process one needs to send it the SIGTERM signal. This is all done conveniently in the rtctkExampleTelSub.sh script by running the following:

rtctkExampleTelSub.sh stop

At this stage none of the example components or related utilities for this tutorial should be running. However, the configuration files still remain in the $INTROOT/run/exampleTelSub/ directory. To clean that up also, the following command should be executed to delete the directory:

rtctkExampleTelSub.sh undeploy

Instantiating a Telemetry Subscriber

In this section we will briefly discuss and demonstrate how to customise an instance of Telemetry Subscriber.

Note

The rtctkExampleTelSub is also used in the Putting Everything Together for a Minimalistic SRTC System tutorial. The modifications that are applied in this section will not be compatible with that example setup. Therefore you should start from a clean copy when moving onto that tutorial.

Overview

The Telemetry Subscriber is not actually a pre-built standalone executable. It is a reusable template component delivered as a library. The RTC Toolkit user needs to instantiate their own instance of a Telemetry Subscriber that is compiled with a user provided shared memory topic. A user provided blender function also needs to be provided that constructs the topic from correlated DDS data samples.

The example instantiation rtctkExampleTelSub uses the rtctk::exampleTopic::ScaoLoopTopic declared in _examples/exampleTopics/src/include/rtctk/exampleTopics/topics.hpp. The code snippet is reproduced below to show how a user could declare their own topics:

constexpr unsigned N_SUBAPS = 4616u;
constexpr unsigned N_COMMANDS = 6316u;

template<unsigned int NSUBAPS>
struct WfsLoopBaseTopic {
    std::array<float, 2*NSUBAPS> slopes;
    std::array<float, NSUBAPS> intensities;
};

struct ScaoLoopTopic {
    uint64_t sample_id;
    WfsLoopBaseTopic<N_SUBAPS> wfs;
    std::array<float, N_COMMANDS> commands;
};

Normally a new Waf/wtools based project to build a new application should be prepared that depends on the rtctk.reusableComponents.telSub.lib. An example of this is the one used for rtctkExampleTelSub, which is found at _examples/exampleTelSub/app/wscript. For this tutorial, we will reuse this example and make modifications to this application in place.

The rtctkExampleTelSub as currently implemented only supports one DDS input topic for the slopes and only partially constructs the ScaoLoopTopic topic. We will make the necessary modifications to extend this Telemetry Subscriber instance to support three input DDS topics, for the slopes, intensities and commands, and fully construct the ScaoLoopTopic.

Adjusting the Blender Function

Since we will be reusing the already defined ScaoLoopTopic, the only piece of code that needs to be changed is the blender function that constructs the topic. In our case, this simply involves memory copies of the floating-point data from the input DDS sample buffers to the appropriate locations within the ScaoLoopTopic structure.

Modify the code in the _examples/exampleTelSub/app/src/main.cpp file to look as follows:

void RtcComponentMain(rtctk::componentFramework::Args const& args) {
    using UserTopicType = rtctk::exampleTopic::ScaoLoopTopic;
    auto blender = [](const rtctk::telSub::CorrelatedDataSamplesRef& dds_samples,
                    UserTopicType& shm_sample) noexcept -> std::error_code {

        auto slopes_buffer_size = shm_sample.wfs.slopes.max_size() * sizeof(float);
        auto intensities_buffer_size = shm_sample.wfs.intensities.max_size() * sizeof(float);
        auto commands_buffer_size = shm_sample.commands.max_size() * sizeof(float);

        if (dds_samples.samples.size() != 3) {
            return std::make_error_code(std::errc::bad_message);
        }
        if (dds_samples.samples[0].size != slopes_buffer_size) {
            return std::make_error_code(std::errc::bad_message);
        }
        if (dds_samples.samples[1].size != intensities_buffer_size) {
            return std::make_error_code(std::errc::bad_message);
        }
        if (dds_samples.samples[2].size != commands_buffer_size) {
            return std::make_error_code(std::errc::bad_message);
        }

        shm_sample.sample_id = dds_samples.sample_id;
        std::memcpy(shm_sample.wfs.slopes.data(), dds_samples.samples[0].data,
                    slopes_buffer_size);
        std::memcpy(shm_sample.wfs.intensities.data(), dds_samples.samples[1].data,
                    intensities_buffer_size);
        std::memcpy(shm_sample.commands.data(), dds_samples.samples[2].data,
                    commands_buffer_size);

        return {};
    };
    rtctk::telSub::Main<UserTopicType>(args, std::move(blender));
}

The important points to remember are:

  • The order of the samples is defined by the runtime configuration datapoint <tel-sub-name>/static/dds_topics, which is a list of DDS topic names. For example, the first DDS topic in the datapoint’s list will correspond to the first sample in the dds_samples argument, i.e. dds_samples.samples[0], the second topic in dds_topics will correspond to dds_samples.samples[1] and so on. We keep things simple in the above blender code example and expect the DDS topic order to be: slopes first, followed by intensities, and finally commands.

  • Since the blender function is executed inside the main DDS reading thread, which is time critical, care should be taken not to perform computation heavy tasks here. This is not the intention of the blender function. Only basic sanity checks and the minimum amount of work needed to actually construct the topic should be performed. Heavy computation tasks should be performed in a Data Task instead.

  • The blender function should not be throwing any exceptions. If an error occurs an appropriate error code should be returned instead.

You will notice that the code was written to use a lambda function for the blender. However, this is not strictly required and the following type of declaration will also work:

using UserTopicType = rtctk::exampleTopic::ScaoLoopTopic;

std::error_code Blender(const rtctk::telSub::CorrelatedDataSamplesRef& dds_samples,
                        UserTopicType& shm_sample) noexcept {
    ...
}

void RtcComponentMain(rtctk::componentFramework::Args const& args) {
    rtctk::telSub::Main<UserTopicType>(args, Blender);
}

The modified Telemetry Subscriber instance can now be built and installed as usual.

Add Additional Topics

Let us run the modified rtctkExampleTelSub application as was shown in the beginning of this tutorial, i.e. execute the following command:

rtctkExampleTelSub.sh run

This time you will see the following error messages in the terminal:

[07:37:24:756][ERROR][tel_sub_1] Detected errors in operational logic. [Last error code = 74: Bad message. Total number of errors = 6]
[07:37:24:859][ERROR][tel_sub_1] Detected errors in operational logic. [Last error code = 74: Bad message. Total number of errors = 7]
[07:37:24:961][ERROR][tel_sub_1] Detected errors in operational logic. [Last error code = 74: Bad message. Total number of errors = 8]
...

If you check the shared memory publishing rate with the rtctkExampleShmSub -q scao_loop command you will indeed see a zero rate:

Read 0 samples.     Rate = 0 Hz
Read 0 samples.     Rate = 0 Hz
Read 0 samples.     Rate = 0 Hz
...

This is because we have modified the blender function to expect three DDS input topics, but rtctkExampleTelSub is configured to only listen to the slopes topic. In addition, the rtctkGenDdsPub utility that publishes dummy test data for our example must also be restarted to publish all three DDS topics.

Let us stop rtctkExampleTelSub and rtctkGenDdsPub to reconfigure:

rtctkExampleTelSub.sh stop

Only stop the processes. Do not undeploy. We want to adjust the deployed configuration.

To change the configuration, modify the $INTROOT/run/exampleTelSub/runtime_repo/tel_sub_1.yaml file in the Runtime Configuration Repository so that the dds_topics datapoint looks as follows:

dds_topics:
    type: RtcVectorString
    value:
        - SlopesTopic
        - IntensitiesTopic
        - CommandsTopic

You can check that the modification was correct by running the following command:

rtctkConfigTool --repo "file:$INTROOT/run/exampleTelSub/runtime_repo/" --path /tel_sub_1/static/dds_topics --get

which should produce the following output:

[SlopesTopic, IntensitiesTopic, CommandsTopic]

With the configuration for rtctkExampleTelSub adjusted, we can start the processes again and steer rtctkExampleTelSub to the running state (do not use the command rtctkExampleTelSub.sh run, which would wipe the configuration change that was previously made):

rtctkExampleTelSub.sh start
rtctkExampleTelSub.sh send Init
rtctkExampleTelSub.sh send Enable
rtctkExampleTelSub.sh send Run

Note

You may need to pause a moment after the rtctkExampleTelSub.sh start command to allow the processes to start and register their MAL endpoints, so that they can accept commands.

One running, this time checking with the rtctkExampleShmSub -q scao_loop command will show that we are again receiving samples in the shared memory queue:

Read 10 samples.    Rate = 10 Hz
Read 19 samples.    Rate = 9 Hz
Read 28 samples.    Rate = 9 Hz