Creating a Custom Data Task

This tutorial shows how a Data Task component can be created, configured and build. The goal of this tutorial is to give developers a starting point for creating their own Data Task.

The RTC Tk does not provide any final Data Tasks applications that can be used in a real world RTC out of the box. The toolkit only provides a reference design and different data task support libraries that can be used by instrument RTC developers to create custom data task applications. In the current data task design the RTC Tk provides a state machine and a command interface. The data task also provides an interface to the shared memory queue via the ReaderThread. The rest is up to the user to define.

The idea is to let users provide a custom Business Logic class that defines the behaviour for the different stages of the component’s life-cycle (i.e. it implements the activity methods). They are also to provide the data task specific algorithm, the current proposal is to confine the interface to this via a computation class Computation Class.

Note

For simplicity reasons, the example uses the file based implementation of OLDB, Persistent and Runtime Configuration Repositories; as well as file based service discovery. This means that the underlying format of configuration and data points is different than the one used when the above mentioned services are used with the standard back-ends such as CII configuration service, CII OLDB, etc.

Provided Examples

This customisation of the data task will be discussed in reference to the provided example data tasks. The main aim of this example is not to give a fully fledged Data Tasks that can be used as is, but to show how the pieces may fit together. The following example Data Tasks are provided:

  • Periodic Telemetry Data Task

  • Measure Telemetry Data Task

  • Optimise Data Task

  • GPU and Python integration.

In this document we will go through the Periodic Telemetry Data Task in the most detail and will cover the other Data Tasks in less detail but focus on the important differences.

Periodic Telemetry Data Task. The example aims to show how to configure a Data Task that collects telemetry and preforms a computation after N frames of data are collected. In section Periodic Telemetry Data Task, it is shown how the BusinessLogic can deploy and configure a ReaderThread to process the telemetry data using a callback provided from a Computation class. After N frames of data collected a simple computation is triggered in the Computation class. This is aimed at emulating the function of a loop monitor.

Measure Telemetry Data Task. This example aims to show how the Measure interface can be used with the ReaderThread to perform on command calculations from telemetry sources and return a result. This is aimed to provide an example that provides similar functionality to measure a dark map of a wavefront camera. (see section Measure Telemetry Data Task)

Optimise Data Task. This example aims to show how the Optimise command can be used to perform on demand calculation from data that is in the runtime-repo. This also shows how the JSON can be parsed and allows multiple algorithms to be handled. (see section Optimise Data Task)

GPU and Python integration. See section GPU and Python integration for more information.

Periodic Telemetry Data Task

The example shows how to configure the BusinessLogic and interface with both the ReaderThread to process the telemetry data and perform a very simple calculation. The Computation class provides a callback to copy the wavefront sensor slope vector to a local MatrixBuffer (See Runtime Configuration Repository for more information regarding the MatrixBuffer). The MatrixBuffer of slopes vector is averaged to calculate a average slope vector. This average slope vector is then projected to an average modes vector, using a matrix vector multiplication against a slopes2modes matrix.

Source Code Location

The example source code can be found in the following sub-directory of the rtctk project:

_examples/exampleDataTask/telemetry

Modules

The provided example is composed into the following waf modules:

  • app - The data task application including default configuration

  • scripts - Helper scripts for deployment and control

The example also uses common libraries that are shared with other examples:

Dependencies

The provided example code depends on the following modules:

  • Component Framework - for RTC Component functionality with Services

  • Client Application - to steer the application

Running the Example Data Task

The exampleDataTask can be run in isolation or as part of a SRTC system. The only requirement for the data task to be brought to Operational is that a shared memory queue writer has been created. This can be either part of the Telemetry Subscriber or as a Shared Memory Publisher. In this section the method will use a shmPublisher to bring the Data Task to Operational.

To make the example as simple as possible a script rtctkExampleDataTaskTelemetry.sh is provided to help bring the data task and supporting infrastructure online.

rtctkExampleDataTaskTelemetry.sh

After installing the RTC Tk run the example using the following sequence of commands:

# Deploy and start the example applications
rtctkExampleDataTaskTelemetry.sh deploy
rtctkExampleDataTaskTelemetry.sh start

# Use the client to step through the life-cycle of the respective component instance
rtctkExampleDataTaskTelemetry.sh send Init
rtctkExampleDataTaskTelemetry.sh send Enable
rtctkExampleDataTaskTelemetry.sh send Run
rtctkExampleDataTaskTelemetry.sh send Idle
rtctkExampleDataTaskTelemetry.sh send Disable
rtctkExampleDataTaskTelemetry.sh send Reset

# Gracefully terminate the applcations and clean-up
rtctkExampleDataTaskTelemetry.sh stop
rtctkExampleDataTaskTelemetry.sh undeploy

During the deploy action a python script is invoked to produce the required FITS input files see section Generating Input Data.

Note

The commands indicated above, e.g. when using undeploy, may generate the following output:

rtctkExampleDataTaskTelemetry: no process found
rtctkExampleShmPub: no process found

This is expected and should not be treated as an indication of failure. The same applies for similar commands in the rest of this tutorial.

Generating Input Data

As we do not want to supply large FITS files within our git repository, the decision was made to provide a python script that can be used to generate the required data. The python script to generate the data required by the exampleDataTask is found in the following location:

_examples/exampleDataTask/common/genFitsData

It provides a script called with three arguments as follows:

$ rtctkExampleDataTaskGenFitsData <num_slopes> <num_modes> <filepath>

This will create an identity slopes2modes matrix that is required by the data task. This is by default called by the script “rtctkExampleDataTask.sh” during the deploy action.

Development Guide

This section explains how to create a simple telemetry based Data Task from scratch. See Data Task for more information about specifics, in this section the development of a custom data task is covered. The use of the exampleDataTask will be the focus.

Business Logic Class

The BusinessLogic class adds a point to customise the state and state transitions. For more information see the RTC Component section.

Computation Class

As stated before this is the place where the greatest effort would be put during customising. In the exampleDataTask a very simple calculation is provided to show how this can be done. The computation selected was a projection from slopes to modes, this is done via a Matrix Vector Multiplication.

The following interface are provided to be invoked by the BusinessLogic.

void SetStaticConfig(unsigned to_read, unsigned n_slopes, unsigned n_modes)
void SetDynamicConfig(MatrixBuffer<float>&& s2m_matrix)
void OnDataAvailable(TopicType const& sample)
void Reset()
Result Compute()

At the current time no specific interface is enforced between the BusinessLogic and the Computation class. In the future this may change and add functionality.

void SetStaticConfig()

Sets static configuration data that was retrieved from runtime_repo. This is only called during component initialisation.

void SetDynamicConfig()

Sets dynamic configuration data that was retrieved from runtime_repo. This is called during Updating.

void OnDataAvailable()

Callback used by the ReaderThread to process each shared memory datapoint. This copies data to a buffer local to the computation.

void Reset()

Resets the buffers back to zero. When called the buffer indexing is reset to zero and any required buffers are zeroed out. This is called when the data task received the Run command and invokes the GoingRunning state transition. This puts the computation data into a state where it is safe to start receiving data.

void Compute()

In the Compute function the bulk of the data task algorithm resides. This Data Task algorithm uses the BusinessLogic Running thread to do the computation.

OpenBLAS

Originally the matrix vector multiplication was done using openBLAS to provide the calculation. Unfortunately, this has been modified to use a single threaded implementation with no dependencies on third-party tools. This was done due to openBLAS not being provided by ELT dev env on Centos 7. We did not want to force any new outside dependencies for an example component. The original API can still be seen in the code so it would be trivial to revert on CentOS 8.

GPU

See GPU support for information about GPU support.

ReaderThread Class

Template arguments

The ReaderThread is mainly customisable via configuration. The only mandatory compile-time configuration is the template argument that defines the telemetry data topic (see Shared Memory Queue Topic). In addition a shared memory reader type can also be specified. The default used in this example is using ReaderType = ipcq::Reader<TopicType>;. See roadrunner documentation for more details (The documentation is currently available as part of the roadrunner installation, e.g. if installed with RPMs it is found under the /elt/roadrunner/share/doc/ directory).

Note

The ReaderThread and the writer it is attached to (assuming the writer is part of the Telemetry Subscriber, or the shmPub) require to use the same conditional policy. If using the default Reader/writer this is handled by default. See roadrunner documentaion for more details.

using ReaderType = ipcq::Reader<TopicType>;
using WriterType = ipcq::Writer<TopicType>;

Callbacks

Aside from the configuration of the ReaderThread via a series of setters (see ReaderThread configuration), two callbacks may be provided by the user. The first one is required for processing the data in the shared memory queue. The second callback is optional giving the user the option to initialise some things on the ReaderThread that may be beneficial during the data copy.

The callback to process data is registered with the following function:

void RegisterOnDataCallback(std::function<void(const TopicType& sample)> callback)

In the example this is called during Initialising with an invocation similar to the following:

RegisterOnDataCallback([this](const TopicType& sample) {
    m_computation->OnDataAvailable(sample);
});

This Registers the callback from the Computation class shown below. Where it copies the wavefront sensor slope vector into a local MatrixBuffer local to the Computation Class.

void OnDataAvailable(TopicType const& sample) {
     m_last_sample_id = sample.sample_id;
     m_samples[m_sample_idx++] = sample.wfs.slopes;
}

An additional optional callback can be provided that is invoked once after the ReaderThread is created. This callback is registered at the same point in Initalising with the function void RegisterInitThreadCallback(std::function<void()> callback).

The aim of this callback is to give the user a point where they can initialise some code that maybe beneficial during the data copy. This optional callback is not used in the exampleDataTask, though this has been used in the GPU example (See GPU support). In this GPU example the callback allows GPU specific initialisation such as setting which GPU is to be used by this thread. This could be done in each data processing callback but only requires a single call by the thread so can be done in this callback.

Multiple readers

While not explicitly shown it should be possible to configure a data task that uses multiple ReaderThreads to read telemetry data from multiple loops. A ReaderThread would be required for every loop that data should be read from. This was one of the driving forces for the development to implement the callback based API for interacting with the shared memory queue.

This should be fairly simple to do in the BusinessLogic, the Running state would become more complex as it would have to check every ReaderThread has finished reading before launching the calculation. Separate callbacks from the Computation class would be required.

Configuration

ReaderThread

The configuration of the ReaderThread has been discussed previously. See ReaderThread configuration for the configuration of the ReaderThread.

Runtime Configuration Repository

The example data task also has a series of input and output configuration datapoints, some of them might also be used by other components.

Input:

cfg attribute

Type

Description

slopes

RtcInt32

Number of slopes for the wavefront sensor

modes

RtcInt32

Number of modes to project onto

iteration

RtcInt32

Number of frames to process

s2m

RtcMatrixFloat

Slopes to modes projection matrix

Output:

cfg attribute

Type

Description

avg_slopes

RtcVectorFloat

Average slope vector (length=number of slopes)

avg_mode

RtcVectorFloat

Averge modes (length=number of modes)

As with the ReaderThread configuration (see ReaderThread configuration) the datapoints are spread between the two YAML files common.yaml and data_task_1.yaml. In the exampleDataTask these YAML look as follows:

Listing 1 common.yaml
 static:
     loop_1:
         queue_name:
             type: RtcString
             value: exampleDataTaskQueue
         frequency:
             type: RtcFloat
             value: 5.0
     wfs_1:
         subapertures:
             type: RtcInt32
             value: 4616
         slopes:
             type: RtcInt32
             value: 9232
     common:
         nacts:
             type: RtcInt32
             value: 9232
 dynamic:
     wfs_1:
         avg_slopes:
             type: RtcVectorFloat
             value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.wfs_1.avg_slopes.fits
         avg_modes:
             type: RtcVectorFloat
             value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.wfs_1.avg_modes.fits
Listing 2 data_task_1.yaml
 static:
     common:
         samples_to_read:
             type: RtcInt32
             value: 5
         gpu:
             type: RtcInt32
             value: 0
         nacts:
             type: RtcInt32
             value: 5000
     reader_thread:
         cpu_affinity:
             type: RtcInt32
             value: 0
     computation:
         modes:
             type: RtcInt32
             value: 50
         python:
             module:
                 type: RtcString
                 value: "rtctkExampleDataTaskPythonCode"
             calc_average:
                 type: RtcString
                 value: "calculate_average"
             calc_projection:
                 type: RtcString
                 value: "projection"
 dynamic:
     reader_thread:
         samples_to_skip:
             type: RtcInt32
             value: 10
     computation:
         slopes2modes:
             type: RtcMatrixFloat
             value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.computation.slopes2modes.fits
     im:
         type: RtcMatrixFloat
         value: file:file:$REPO_DIR/runtime_repo/data_task_1.dynamic.IM.fits
     cm:
         type: RtcMatrixFloat
         value: file:file:$REPO_DIR/runtime_repo/data_task_1.dynamic.CM.fits

The datapoint static/computation/gpu is provided. This is used in the GPU example, not here. This was done as both the example here and the GPU example provide the same calculation and work on the same configuration data. See GPU support for more information.

OLDB

Datapoint

Type

Description

iteration

RtcInt32

Number of cycles of the calculation

time

RtcFloat

How long the last calculation took

last_sample_id

RtcInt32

The final sample id used in the previous calculation.

This creates some statistics in the OLDB to monitor the data task.

statistics:
   computation:
      iteration:
         type: RtcInt32
         value: 0
      time:
         type: RtcFloat
         value: 0
      statistics:
         type: RtcInt32
         last_sample_id: 0

Measure Telemetry Data Task

The Measure Telemetry Data Task allows the user to trigger an on-demand computation based on telemetry. This may be like measuring the Dark Map of a wavefront sensor.

When the Measure command is triggered the Data Task starts processing incoming telemetry and performs a computation on this data. A feature of the Measure interface is it can return the result to the user in a user defined JSON, it can also put any results into the Runtime Configuration Repository.

Source Code Location

The example source code can be found in the following sub-directory of the rtctk project:

_examples/exampleDataTask/measureTel

Modules

The provided example is composed into the following waf modules:

  • app - The data task application including default configuration

  • scripts - Helper scripts for deployment and control

The example also uses common libraries that are shared with other examples:

Dependencies

The provided example code depends on the following modules:

  • Component Framework - for RTC Component functionality with Services

  • Client Application - to steer the application

Running the Example Data Task

The Measure Telemetry Data Task can be run in isolation or as part of a SRTC system. As with the Periodic Telemetry Data Task, the only requirement for the data task to be brought to Operational is that a shared memory queue writer has been created. This can be either part of the Telemetry Subscriber or as a Shared Memory Publisher. In this section the method will use a shmPublisher to bring the Data Task to Operational. By sending the Measure command with a specific JSON payload the Data Task can provide multiple computations depending on the payload. Here we provide a very simple payload of '{"algorithm":"mean"}' to specify the mean should be calculated.

To make the example as simple as possible a script rtctkExampleDataTaskMeasureTel.sh is provided to help bring the data task and supporting infrastructure online.

rtctkExampleDataTaskMeasureTel.sh

After installing the RTC Tk run the example using the following sequence of commands:

# Deploy and start the example applications
rtctkExampleDataTaskMeasureTel.sh deploy
rtctkExampleDataTaskMeasureTel.sh start

# Use the client to step through the life-cycle of the respective component instance
rtctkExampleDataTaskMeasureTel.sh send Init
rtctkExampleDataTaskMeasureTel.sh send Enable
rtctkExampleDataTaskMeasureTel.sh send Measure '{"algorithm":"mean"}'
rtctkExampleDataTaskMeasureTel.sh send Disable
rtctkExampleDataTaskMeasureTel.sh send Reset

# Gracefully terminate the applcations and clean-up
rtctkExampleDataTaskMeasureTel.sh stop
rtctkExampleDataTaskMeasureTel.sh undeploy

Note

The commands indicated above, e.g. when using undeploy, may generate the following output:

rtctkExampleDataTaskMeasureTel: no process found
rtctkExampleShmPub: no process found

This is expected and should not be treated as an indication of failure. The same applies for similar commands in the rest of this tutorial.

Development Guide

The process for configuring the ReaderThread (see ReaderThread Class) and Computation class (see Computation Class) is explained above in previous sections. In this section we will focus on the measure interface provided to the BusinessLogic.

Business Logic Class

The BusinessLogic class adds a point to customise the state and state transitions.

for more information see RTC Component

To add ActivityMeasuring to the BusinessLogic the following include is required. #include <rtctk/componentFramework/measurable.hpp>

It is then required to declare the life-cycle of the component to be Measurable with the following lines:

using LifeCycle = Measurable<RtcComponent>;

within the BusinessLogic. using ComponentType = LifeCycle;

This declaration provides the BusinessLogic an interface to the following JsonPayload ActivityMeasuring(StopToken st, JsonPayload const& arg) override{

This can then be customised to suit the needs of the Data Task. As this is called with a JSON command this require parsing by the user. The rest of this example is similar to that of the Periodic Telemetry Data Task example, see Periodic Telemetry Data Task for more details.

See Client Application for more information about the Measurable interface.

Optimise Data Task

The Optimise Data Task offers the Optimise interface. This is a on-demand computation triggered by the user. It is a Data Task that can take data from the Runtime Configuration Repository and perform a complex computation and return the result to the Runtime Configuration Repository.

Note

The Optimise Data task has been combined with the python integration to reduce the number of example providing similar functionality. As such some of the information here is repeated in Python support. It is advised to read the python section as well as this even when implementing a non-python based Data Task as useful information is in both places.

Source Code Location

The example source code can be found in the following sub-directory of the rtctk project:

_examples/exampleDataTask/optimiser

Modules

The provided example is composed into the following waf modules:

  • app - The data task application including default configuration.

  • pyLib - Python library containing calculation.

  • scripts - Helper scripts for deployment and control.

In addition the example uses the following component framework libraries:

  • rtctkRtcComponent - For RtcComponent functionality

  • rtctkServicesPython - Framework provided Python bindings for C++

Running the Example

The optimiser example can be run in isolation or as part of an SRTC system. The Python computation can be invoked after bringing the component to state Operational. By sending the Optimise command with a specific JSON payload the Python interpreter is started and the computation is carried out.

The script rtctkExampleDataTaskOptimiser.sh is provided to bring the example data task and the supporting infrastructure online in a simple way.

rtctkExampleDataTaskOptimiser.sh

After installing the RTC Tk run the example using the following sequence of commands:

# Deploy and start the example applications
rtctkExampleDataTaskOptimiser.sh deploy
rtctkExampleDataTaskOptimiser.sh start

# Use the client to step through the life-cycle of the respective component instance
rtctkExampleDataTaskOptimiser.sh send Init
rtctkExampleDataTaskOptimiser.sh send Enable
rtctkExampleDataTaskOptimiser.sh send Optimise '{"algorithm":"SimpleInversion"}'
rtctkExampleDataTaskOptimiser.sh send Disable
rtctkExampleDataTaskOptimiser.sh send Reset

# Gracefully terminate the applications and clean-up
rtctkExampleDataTaskOptimiser.sh stop
rtctkExampleDataTaskOptimiser.sh undeploy

Note

The commands indicated above, e.g. when using undeploy, may generate the following output:

rtctkExampleDataTaskOptimiser: no process found

This is expected and should not be treated as an indication of failure. The same applies for similar commands in the rest of this tutorial.

Development Guide

The majority of the required information how to configure a component from the runtime_repo, etc. has previously been addressed. In this section we will focus on just the Optimise command of the BusinessLogic.

Business Logic Class

The BusinessLogic class adds a point to customise the state and state transitions.

for more information see RTC Component

To add ActivityOptimising to the BusinessLogic the following include is required. #include <rtctk/componentFramework/optimisable.hpp>

It is then required to declare the LifeCycle of the component to be Optimisable with the following lines:

using LifeCycle = Optimisable<RtcComponent>;

within the BusinessLogic. using ComponentType = LifeCycle;

This decleration provides the BusinessLogic an interface to the following JsonPayload ActivityOptimising(StopToken st, JsonPayload const& arg) override{

In this example the JSON is parsed and the calculation is computed with the result returned to the runtime_repo.

void ActivityOptimising(StopToken st, JsonPayload const& arg) override {
    auto result = m_computation.Compute(ParseAlgorithm(arg));
    m_rtr.WriteDataPoint<MatrixBuffer<float>>(m_dp_cm, result.cm);
    m_oldb.SetDataPoint<double>(m_dp_stats_time, result.stats.elapsed.count());
}

This can then be customised to suit the needs of the Data Task. As this is called with a JSON command this require parsing by the user. The rest of this example is similar to Data Tasks previously detailed without ReaderThread as no telemetry is being processed.

See Client Application for more information about the Optimisable interface.

GPU and Python integration

The GPU and python integration is too big a topic to cover here and separate pages are provided to focus on the specific details.

GPU Support

See GPU support for information about GPU support.

Python Support

See Python support for more information about python interfaces.