Creating a Custom Data Task¶
This tutorial shows how a Data Task component can be created, configured and build. The goal of this tutorial is to give developers a starting point for creating their own Data Task.
The RTC Tk does not provide any final Data Tasks applications that can be used in a real world RTC out of the box. The toolkit only provides a reference design and different data task support libraries that can be used by instrument RTC developers to create custom data task applications. In the current data task design the RTC Tk provides a state machine and a command interface. The data task also provides an interface to the shared memory queue via the ReaderThread. The rest is up to the user to define.
The idea is to let users provide a custom Business Logic class that defines the behaviour for the different stages of the component’s life-cycle (i.e. it implements the activity methods). They are also to provide the data task specific algorithm, the current proposal is to confine the interface to this via a computation class Computation Class.
Note
For simplicity reasons, the example uses the file based implementation of OLDB, Persistent and Runtime Configuration Repositories; as well as file based service discovery. This means that the underlying format of configuration and data points is different than the one used when the above mentioned services are used with the standard back-ends such as CII configuration service, CII OLDB, etc.
Provided Examples¶
This customisation of the data task will be discussed in reference to the provided example data tasks. The main aim of this example is not to give a fully fledged Data Tasks that can be used as is, but to show how the pieces may fit together. The following example Data Tasks are provided:
Periodic Telemetry Data Task
Measure Telemetry Data Task
Optimise Data Task
GPU and Python integration.
In this document we will go through the Periodic Telemetry Data Task in the most detail and will cover the other Data Tasks in less detail but focus on the important differences.
Periodic Telemetry Data Task. The example aims to show how to configure a Data Task that collects telemetry and preforms a computation after N frames of data are collected. In section Periodic Telemetry Data Task, it is shown how the BusinessLogic can deploy and configure a ReaderThread to process the telemetry data using a callback provided from a Computation class. After N frames of data collected a simple computation is triggered in the Computation class. This is aimed at emulating the function of a loop monitor.
Measure Telemetry Data Task. This example aims to show how the Measure interface can be used with the ReaderThread to perform on command calculations from telemetry sources and return a result. This is aimed to provide an example that provides similar functionality to measure a dark map of a wavefront camera. (see section Measure Telemetry Data Task)
Optimise Data Task. This example aims to show how the Optimise command can be used to perform on demand calculation from data that is in the runtime-repo. This also shows how the JSON can be parsed and allows multiple algorithms to be handled. (see section Optimise Data Task)
GPU and Python integration. See section GPU and Python integration for more information.
Periodic Telemetry Data Task¶
The example shows how to configure the BusinessLogic and interface with both the ReaderThread to process the telemetry data and perform a very simple calculation. The Computation class provides a callback to copy the wavefront sensor slope vector to a local MatrixBuffer (See Runtime Configuration Repository for more information regarding the MatrixBuffer). The MatrixBuffer of slopes vector is averaged to calculate a average slope vector. This average slope vector is then projected to an average modes vector, using a matrix vector multiplication against a slopes2modes matrix.
Source Code Location¶
The example source code can be found in the following sub-directory of the rtctk project:
_examples/exampleDataTask/telemetry
Modules¶
The provided example is composed into the following waf modules:
app - The data task application including default configuration
scripts - Helper scripts for deployment and control
The example also uses common libraries that are shared with other examples:
shmPub - Implements an example shmPublisher. (See Shared Memory Publisher)
Dependencies¶
The provided example code depends on the following modules:
Component Framework - for RTC Component functionality with Services
Client Application - to steer the application
Running the Example Data Task¶
The exampleDataTask can be run in isolation or as part of a SRTC system. The only requirement for the data task to be brought to Operational is that a shared memory queue writer has been created. This can be either part of the Telemetry Subscriber or as a Shared Memory Publisher. In this section the method will use a shmPublisher to bring the Data Task to Operational.
To make the example as simple as possible a script rtctkExampleDataTaskTelemetry.sh is provided to help bring the data task and supporting infrastructure online.
rtctkExampleDataTaskTelemetry.sh¶
After installing the RTC Tk run the example using the following sequence of commands:
# Deploy and start the example applications
rtctkExampleDataTaskTelemetry.sh deploy
rtctkExampleDataTaskTelemetry.sh start
# Use the client to step through the life-cycle of the respective component instance
rtctkExampleDataTaskTelemetry.sh send Init
rtctkExampleDataTaskTelemetry.sh send Enable
rtctkExampleDataTaskTelemetry.sh send Run
rtctkExampleDataTaskTelemetry.sh send Idle
rtctkExampleDataTaskTelemetry.sh send Disable
rtctkExampleDataTaskTelemetry.sh send Reset
# Gracefully terminate the applcations and clean-up
rtctkExampleDataTaskTelemetry.sh stop
rtctkExampleDataTaskTelemetry.sh undeploy
During the deploy action a python script is invoked to produce the required FITS input files see section Generating Input Data.
Note
The commands indicated above, e.g. when using undeploy, may generate the following output:
rtctkExampleDataTaskTelemetry: no process found
rtctkExampleShmPub: no process found
This is expected and should not be treated as an indication of failure. The same applies for similar commands in the rest of this tutorial.
Generating Input Data¶
As we do not want to supply large FITS files within our git repository, the decision was made to provide a python script that can be used to generate the required data. The python script to generate the data required by the exampleDataTask is found in the following location:
_examples/exampleDataTask/common/genFitsData
It provides a script called with three arguments as follows:
$ rtctkExampleDataTaskGenFitsData <num_slopes> <num_modes> <filepath>
This will create an identity slopes2modes matrix that is required by the data task. This is by default called by the script “rtctkExampleDataTask.sh” during the deploy action.
Development Guide¶
This section explains how to create a simple telemetry based Data Task from scratch. See Data Task for more information about specifics, in this section the development of a custom data task is covered. The use of the exampleDataTask will be the focus.
Business Logic Class¶
The BusinessLogic class adds a point to customise the state and state transitions. For more information see the RTC Component section.
Computation Class¶
As stated before this is the place where the greatest effort would be put during customising. In the exampleDataTask a very simple calculation is provided to show how this can be done. The computation selected was a projection from slopes to modes, this is done via a Matrix Vector Multiplication.
The following interface are provided to be invoked by the BusinessLogic.
void SetStaticConfig(unsigned to_read, unsigned n_slopes, unsigned n_modes)
void SetDynamicConfig(MatrixBuffer<float>&& s2m_matrix)
void OnDataAvailable(TopicType const& sample)
void Reset()
Result Compute()
At the current time no specific interface is enforced between the BusinessLogic and the Computation class. In the future this may change and add functionality.
void SetStaticConfig()¶
Sets static configuration data that was retrieved from runtime_repo. This is only called during component initialisation.
void SetDynamicConfig()¶
Sets dynamic configuration data that was retrieved from runtime_repo. This is called during Updating.
void OnDataAvailable()¶
Callback used by the ReaderThread to process each shared memory datapoint. This copies data to a buffer local to the computation.
void Reset()¶
Resets the buffers back to zero. When called the buffer indexing is reset to zero and any required buffers are zeroed out. This is called when the data task received the Run command and invokes the GoingRunning state transition. This puts the computation data into a state where it is safe to start receiving data.
void Compute()¶
In the Compute function the bulk of the data task algorithm resides. This Data Task algorithm uses the BusinessLogic Running thread to do the computation.
OpenBLAS¶
Originally the matrix vector multiplication was done using openBLAS to provide the calculation. Unfortunately, this has been modified to use a single threaded implementation with no dependencies on third-party tools. This was done due to openBLAS not being provided by ELT dev env on Centos 7. We did not want to force any new outside dependencies for an example component. The original API can still be seen in the code so it would be trivial to revert on CentOS 8.
GPU¶
See GPU support for information about GPU support.
ReaderThread Class¶
Template arguments¶
The ReaderThread is mainly customisable via configuration. The only mandatory compile-time
configuration is the template argument that defines the telemetry data topic (see Shared Memory Queue Topic).
In addition a shared memory reader type can also be specified. The default used in this example is
using ReaderType = ipcq::Reader<TopicType>;
.
See roadrunner documentation for more details (The documentation is currently available as part of
the roadrunner installation, e.g. if installed with RPMs it is found under the
/elt/roadrunner/share/doc/ directory).
Note
The ReaderThread and the writer it is attached to (assuming the writer is part of the Telemetry Subscriber, or the shmPub) require to use the same conditional policy. If using the default Reader/writer this is handled by default. See roadrunner documentaion for more details.
using ReaderType = ipcq::Reader<TopicType>;
using WriterType = ipcq::Writer<TopicType>;
Callbacks¶
Aside from the configuration of the ReaderThread via a series of setters (see ReaderThread configuration), two callbacks may be provided by the user. The first one is required for processing the data in the shared memory queue. The second callback is optional giving the user the option to initialise some things on the ReaderThread that may be beneficial during the data copy.
The callback to process data is registered with the following function:
void RegisterOnDataCallback(std::function<void(const TopicType& sample)> callback)
In the example this is called during Initialising with an invocation similar to the following:
RegisterOnDataCallback([this](const TopicType& sample) {
m_computation->OnDataAvailable(sample);
});
This Registers the callback from the Computation class shown below. Where it copies the wavefront sensor slope vector into a local MatrixBuffer local to the Computation Class.
void OnDataAvailable(TopicType const& sample) {
m_last_sample_id = sample.sample_id;
m_samples[m_sample_idx++] = sample.wfs.slopes;
}
An additional optional callback can be provided that is invoked once
after the ReaderThread is created. This callback is registered at the same point in Initalising
with the function void RegisterInitThreadCallback(std::function<void()> callback)
.
The aim of this callback is to give the user a point where they can initialise some code that maybe beneficial during the data copy. This optional callback is not used in the exampleDataTask, though this has been used in the GPU example (See GPU support). In this GPU example the callback allows GPU specific initialisation such as setting which GPU is to be used by this thread. This could be done in each data processing callback but only requires a single call by the thread so can be done in this callback.
Multiple readers¶
While not explicitly shown it should be possible to configure a data task that uses multiple ReaderThreads to read telemetry data from multiple loops. A ReaderThread would be required for every loop that data should be read from. This was one of the driving forces for the development to implement the callback based API for interacting with the shared memory queue.
This should be fairly simple to do in the BusinessLogic, the Running state would become more complex as it would have to check every ReaderThread has finished reading before launching the calculation. Separate callbacks from the Computation class would be required.
Configuration¶
ReaderThread¶
The configuration of the ReaderThread has been discussed previously. See ReaderThread configuration for the configuration of the ReaderThread.
Runtime Configuration Repository¶
The example data task also has a series of input and output configuration datapoints, some of them might also be used by other components.
Input:
cfg attribute |
Type |
Description |
---|---|---|
slopes |
RtcInt32 |
Number of slopes for the wavefront sensor |
modes |
RtcInt32 |
Number of modes to project onto |
iteration |
RtcInt32 |
Number of frames to process |
s2m |
RtcMatrixFloat |
Slopes to modes projection matrix |
Output:
cfg attribute |
Type |
Description |
---|---|---|
avg_slopes |
RtcVectorFloat |
Average slope vector (length=number of slopes) |
avg_mode |
RtcVectorFloat |
Averge modes (length=number of modes) |
As with the ReaderThread configuration (see ReaderThread configuration) the datapoints are spread between the two YAML files common.yaml and data_task_1.yaml. In the exampleDataTask these YAML look as follows:
static:
loop_1:
queue_name:
type: RtcString
value: exampleDataTaskQueue
frequency:
type: RtcFloat
value: 5.0
wfs_1:
subapertures:
type: RtcInt32
value: 4616
slopes:
type: RtcInt32
value: 9232
common:
nacts:
type: RtcInt32
value: 9232
dynamic:
wfs_1:
avg_slopes:
type: RtcVectorFloat
value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.wfs_1.avg_slopes.fits
avg_modes:
type: RtcVectorFloat
value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.wfs_1.avg_modes.fits
static:
common:
samples_to_read:
type: RtcInt32
value: 5
gpu:
type: RtcInt32
value: 0
nacts:
type: RtcInt32
value: 5000
reader_thread:
cpu_affinity:
type: RtcInt32
value: 0
computation:
modes:
type: RtcInt32
value: 50
python:
module:
type: RtcString
value: "rtctkExampleDataTaskPythonCode"
calc_average:
type: RtcString
value: "calculate_average"
calc_projection:
type: RtcString
value: "projection"
dynamic:
reader_thread:
samples_to_skip:
type: RtcInt32
value: 10
computation:
slopes2modes:
type: RtcMatrixFloat
value: file:$REPO_DIR/runtime_repo/data_task_1.dynamic.computation.slopes2modes.fits
im:
type: RtcMatrixFloat
value: file:file:$REPO_DIR/runtime_repo/data_task_1.dynamic.IM.fits
cm:
type: RtcMatrixFloat
value: file:file:$REPO_DIR/runtime_repo/data_task_1.dynamic.CM.fits
The datapoint static/computation/gpu is provided. This is used in the GPU example, not here. This was done as both the example here and the GPU example provide the same calculation and work on the same configuration data. See GPU support for more information.
OLDB¶
Datapoint |
Type |
Description |
---|---|---|
iteration |
RtcInt32 |
Number of cycles of the calculation |
time |
RtcFloat |
How long the last calculation took |
last_sample_id |
RtcInt32 |
The final sample id used in the previous calculation. |
This creates some statistics in the OLDB to monitor the data task.
statistics:
computation:
iteration:
type: RtcInt32
value: 0
time:
type: RtcFloat
value: 0
statistics:
type: RtcInt32
last_sample_id: 0
Measure Telemetry Data Task¶
The Measure Telemetry Data Task allows the user to trigger an on-demand computation based on telemetry. This may be like measuring the Dark Map of a wavefront sensor.
When the Measure command is triggered the Data Task starts processing incoming telemetry and performs a computation on this data. A feature of the Measure interface is it can return the result to the user in a user defined JSON, it can also put any results into the Runtime Configuration Repository.
Source Code Location¶
The example source code can be found in the following sub-directory of the rtctk project:
_examples/exampleDataTask/measureTel
Modules¶
The provided example is composed into the following waf modules:
app - The data task application including default configuration
scripts - Helper scripts for deployment and control
The example also uses common libraries that are shared with other examples:
shmPub - Implements an example shmPublisher. (See Shared Memory Publisher)
Dependencies¶
The provided example code depends on the following modules:
Component Framework - for RTC Component functionality with Services
Client Application - to steer the application
Running the Example Data Task¶
The Measure Telemetry Data Task can be run in isolation or as part of a SRTC system. As with the
Periodic Telemetry Data Task, the only requirement for the data task to be brought to
Operational is that a shared memory queue writer has been created. This can be either part of
the Telemetry Subscriber or as a Shared Memory Publisher. In this section the method will use a shmPublisher to
bring the Data Task to Operational. By sending the Measure command with a specific JSON payload
the Data Task can provide multiple computations depending on the payload. Here we provide a very
simple payload of '{"algorithm":"mean"}'
to specify the mean should be calculated.
To make the example as simple as possible a script rtctkExampleDataTaskMeasureTel.sh is provided to help bring the data task and supporting infrastructure online.
rtctkExampleDataTaskMeasureTel.sh¶
After installing the RTC Tk run the example using the following sequence of commands:
# Deploy and start the example applications
rtctkExampleDataTaskMeasureTel.sh deploy
rtctkExampleDataTaskMeasureTel.sh start
# Use the client to step through the life-cycle of the respective component instance
rtctkExampleDataTaskMeasureTel.sh send Init
rtctkExampleDataTaskMeasureTel.sh send Enable
rtctkExampleDataTaskMeasureTel.sh send Measure '{"algorithm":"mean"}'
rtctkExampleDataTaskMeasureTel.sh send Disable
rtctkExampleDataTaskMeasureTel.sh send Reset
# Gracefully terminate the applcations and clean-up
rtctkExampleDataTaskMeasureTel.sh stop
rtctkExampleDataTaskMeasureTel.sh undeploy
Note
The commands indicated above, e.g. when using undeploy, may generate the following output:
rtctkExampleDataTaskMeasureTel: no process found
rtctkExampleShmPub: no process found
This is expected and should not be treated as an indication of failure. The same applies for similar commands in the rest of this tutorial.
Development Guide¶
The process for configuring the ReaderThread (see ReaderThread Class) and Computation class (see Computation Class) is explained above in previous sections. In this section we will focus on the measure interface provided to the BusinessLogic.
Business Logic Class¶
The BusinessLogic class adds a point to customise the state and state transitions.
for more information see RTC Component
To add ActivityMeasuring to the BusinessLogic the following include is required.
#include <rtctk/componentFramework/measurable.hpp>
It is then required to declare the life-cycle of the component to be Measurable with the following lines:
using LifeCycle = Measurable<RtcComponent>;
within the BusinessLogic.
using ComponentType = LifeCycle;
This declaration provides the BusinessLogic an interface to the following
JsonPayload ActivityMeasuring(StopToken st, JsonPayload const& arg) override{
This can then be customised to suit the needs of the Data Task. As this is called with a JSON command this require parsing by the user. The rest of this example is similar to that of the Periodic Telemetry Data Task example, see Periodic Telemetry Data Task for more details.
See Client Application for more information about the Measurable interface.
Optimise Data Task¶
The Optimise Data Task offers the Optimise interface. This is a on-demand computation triggered by the user. It is a Data Task that can take data from the Runtime Configuration Repository and perform a complex computation and return the result to the Runtime Configuration Repository.
Note
The Optimise Data task has been combined with the python integration to reduce the number of example providing similar functionality. As such some of the information here is repeated in Python support. It is advised to read the python section as well as this even when implementing a non-python based Data Task as useful information is in both places.
Source Code Location¶
The example source code can be found in the following sub-directory of the rtctk project:
_examples/exampleDataTask/optimiser
Modules¶
The provided example is composed into the following waf modules:
app - The data task application including default configuration.
pyLib - Python library containing calculation.
scripts - Helper scripts for deployment and control.
In addition the example uses the following component framework libraries:
rtctkRtcComponent - For RtcComponent functionality
rtctkServicesPython - Framework provided Python bindings for C++
Running the Example¶
The optimiser example can be run in isolation or as part of an SRTC system. The Python computation can be invoked after bringing the component to state Operational. By sending the Optimise command with a specific JSON payload the Python interpreter is started and the computation is carried out.
The script rtctkExampleDataTaskOptimiser.sh is provided to bring the example data task and the supporting infrastructure online in a simple way.
rtctkExampleDataTaskOptimiser.sh¶
After installing the RTC Tk run the example using the following sequence of commands:
# Deploy and start the example applications
rtctkExampleDataTaskOptimiser.sh deploy
rtctkExampleDataTaskOptimiser.sh start
# Use the client to step through the life-cycle of the respective component instance
rtctkExampleDataTaskOptimiser.sh send Init
rtctkExampleDataTaskOptimiser.sh send Enable
rtctkExampleDataTaskOptimiser.sh send Optimise '{"algorithm":"SimpleInversion"}'
rtctkExampleDataTaskOptimiser.sh send Disable
rtctkExampleDataTaskOptimiser.sh send Reset
# Gracefully terminate the applications and clean-up
rtctkExampleDataTaskOptimiser.sh stop
rtctkExampleDataTaskOptimiser.sh undeploy
Note
The commands indicated above, e.g. when using undeploy, may generate the following output:
rtctkExampleDataTaskOptimiser: no process found
This is expected and should not be treated as an indication of failure. The same applies for similar commands in the rest of this tutorial.
Development Guide¶
The majority of the required information how to configure a component from the runtime_repo, etc. has previously been addressed. In this section we will focus on just the Optimise command of the BusinessLogic.
Business Logic Class¶
The BusinessLogic class adds a point to customise the state and state transitions.
for more information see RTC Component
To add ActivityOptimising to the BusinessLogic the following include is required.
#include <rtctk/componentFramework/optimisable.hpp>
It is then required to declare the LifeCycle of the component to be Optimisable with the following lines:
using LifeCycle = Optimisable<RtcComponent>;
within the BusinessLogic.
using ComponentType = LifeCycle;
This decleration provides the BusinessLogic an interface to the following
JsonPayload ActivityOptimising(StopToken st, JsonPayload const& arg) override{
In this example the JSON is parsed and the calculation is computed with the result returned to the runtime_repo.
void ActivityOptimising(StopToken st, JsonPayload const& arg) override {
auto result = m_computation.Compute(ParseAlgorithm(arg));
m_rtr.WriteDataPoint<MatrixBuffer<float>>(m_dp_cm, result.cm);
m_oldb.SetDataPoint<double>(m_dp_stats_time, result.stats.elapsed.count());
}
This can then be customised to suit the needs of the Data Task. As this is called with a JSON command this require parsing by the user. The rest of this example is similar to Data Tasks previously detailed without ReaderThread as no telemetry is being processed.
See Client Application for more information about the Optimisable interface.
GPU and Python integration¶
The GPU and python integration is too big a topic to cover here and separate pages are provided to focus on the specific details.
GPU Support¶
See GPU support for information about GPU support.
Python Support¶
See Python support for more information about python interfaces.