Data Task

Overview

As described in the design document, Data Tasks are RTC Components that perform custom computations on data sets derived from Telemetry and/or Runtime Configuration data.

Due to the fact that input data, computation and output data are data task and instrument specific, it is not possible for the RTC Tk to deliver a one size fits all Data Task component that covers the entire spectrum of required calculation and that can be customised by only modifying configuration. Instrument RTC developers will have to get their hands dirty and implement the bulk of the Data Task code themselves.

To make the implementation of the concrete Data Task application easier, the RTC Tk provides supporting libraries and a reference structure for Data Tasks. In this page the focus is the on the ReaderThread class. This is a key tool that simplifies the accumulation of telemetry data from the shared memory queue. Developers will then be able to focus on the algorithms being implemented rather then the infrastructure.

Prerequisites

Required Dependencies:

  • From roadrunner

    • NUMA++ C++ libnuma interface.

    • ipcq Interprocess communication over shared memory.

Optional Dependencies:

  • CUDA toolkit - tools for GPU acceleration and GPU drivers.

  • openblas - Math library

See GPU support for more information about GPU support.

Customisation

Custom Data Tasks are implemented by instrument RTC developers, they have to write the bulk of the code themselves but they are encouraged to make their life easier by re-using the provided support libraries.

The tutorials Creating a Simple RTC Component and Creating a Custom Data Task show concrete examples on how to implement custom RTC Components and Data Tasks from scratch.

The reusable Data Task libraries can be found in reusableComponents/dataTask and the examples of implementation can be found in the _examples/exampleDataTask.

Reference Design

Due to the fact that Data Tasks can be very specific and varied, it is hard to demonstrate and to describe a generic Data Task. As such, the description provided here is based on a reference design for a very simple telemetry-based Data Task.

Outline

The Data Task in the reference design is purposefully kept simple. The aim of this Data Task is to read in telemetry from shared memory, extract the wavefront sensor slope vectors and copy them into a local buffer. When the specified number of slopes have been buffered, the Data Task will launch a computation. In the example it averages the slopes over the accumulation window and then projects this average slope vector into modal space. The resultant average mode vector is calculated using a matrix vector multiplication. These results are uploaded to the runtime_repo. The cycle will then repeat as long as the Data Task remains in state Running.

This very simple example shows the important features that will allow users to develop their own Data Tasks. It shows how to interface with the shared memory queue to accumulate telemetry data using the ReaderThread. It also gives an example of how to customise the BusinessLogic and how to interact with the Runtime Configuration Repository and the OLDB.

In the following sections an overview is given on how the separate parts of a data task can be put together. For more details on how to customise the Data Task see Creating a Custom Data Task.

Structure

In the figure below a high-level structural outline of the data task is shown. The RTC Component life-cycle is implemented in the BusinessLogic. It owns and controls both the ReaderThread and the Computation. The diagram also provides a non-exhaustive overview of the most important API methods.

../../_images/DataTask_Structure_cb535762-9c4e-49ea-a690-49d3edc67856.png

Fig. 1 High-level Structure of a Data Task

The BusinessLogic is the glue that holds the Data Task together. It provides life-cycle methods according to the states and state transitions of the RTC Component. By customising these methods with calls to the ReaderThread and Computation class it is possible to create custom Data Tasks.

The ReaderThread class is used to retrieve Telemetry Data from the shared memory queue. It makes use of the roadrunner::ipcq library. The class contains a simple thread that runs a state machine that calls a user-provided callback method when a new data sample is available to be processed.

The Computation class provides methods of the concrete computation, here users will spend the majority of their efforts implementing their specific the computation algorithm. The main focus is on a callback for processing data from shared memory queue and on the computation that, in our example is triggered after the ReaderThread indicated enough samples have been processed.

At this point no structure is forced on the user when developing the Computation class. They are free to design the class and interfaces with the BusinessLogic in the manner they feel is best.

Interaction

The figure below outlines the interaction of the different classes, along with other entities such as the Runtime Configuration Repository and the OLDB.

../../_images/DataTask_Interaction_f404dc21-7311-4cb6-8cdd-4ec2a33c1cb3.png

Fig. 2 Interaction inside a Data Task component

The Data Task receives commands from other external entities and invokes the methods of the BusinessLogic class accordingly. This is part of the standard RTC Component functionality and shown in a simplistic way in ComponentControl.

The BusinessLogic interfaces with the ReaderThread and Computation classes invoking methods controlling their state. This is shown in ReaderControl and CompuationControl. The BusinessLogic also interfaces with the Runtime Configuration Repository and OLDB to allow loading of configuration data and publication of statistics.

The ReaderThread interfaces with the shared memory queue to accumulate telemetry data. It also requires the registration of callback functions to provide the data copy to a local buffer of the computation. This callback interface removes a direct dependency towards the Computation class. The ReaderThread requires the callbacks to be registered via the BusinessLogic.

The ReaderThread has the following interfaces designed to be called from the BusinessLogic.

void Spawn()
void Join()
void Run()
void Idle()
void WaitUntilComputationAllowed()
void SignalComputationDone()

// a series of setters:
void SetQueueName(std::string name)
void SetThreadName(std::string name)
void SetCpuAffinity(int affinity)
void SetSamplesToRead(size_t value)
void SetSamplesToSkip(size_t value)
void SetLoopFrequency(size_t value)
void SetErrorMargin(float value)
void RegisterOnDataCallback(std::function callback)
void RegisterInitThreadCallback(std::function callback)

The Computation class implements the concrete computation algorithms. It receives data from the ReaderThread and returns results and statistics that are written to the Runtime Repository and to the OLDB.

At the current time the exampleDataTask computation class has the following interfaces designed to be invoked by the BusinessLogic. These are not designed to be exhaustive and only invoke the logic to perform the example data task not a generic data task. This will be required to be extended in the future. See Computation Behaviour for the state machine used in the example.

void SetStaticConfig(/* args */)
void SetDynamicConfig(/* args */)
void OnDataAvailable(TopicType const& sample)
Result Compute()
void Reset()

Behaviour

Component Life-Cycle

The BusinessLogic class is the glue that holds the Data Task together. It provides the life-cycle methods that are called when commands are being received and it owns other objects and the internal state information of the Data Task component. Developers implement the life-cycle methods to provide custom behavior and to interact with other objects owned by the BusinessLogic class. See RTC Component for more information.

Initialising

In ActivityInitialising a Computation and ReaderThread instance is created passing the shared memory topic definition as a template argument. Then the required callbacks are registered and setters are invoked to configure both the ReaderThread and the Computation.

Enabling

In ActivityEnabling the ReaderThread spawns its thread and enters its state machine.

void ActivityEnabling(StopToken st) override
{
    m_reader->Spawn();
}

The spawn function checks that the ReaderThread has been configured with all required variables set and then spawns its activity thread, entering its state machine. See ReaderThread Behaviour.

There is no interaction with the Computation class in this activity.

Disabling

In ActivityDisabling the BusinessLogic tells the ReaderThread to rejoin its activity thread. The data task must be Operational for this to be invoked.

void ActivityDisabling(StopToken st) override
{
    m_reader->Join();
}

There is no interaction with the Computation class in this activity.

GoingRunning

In ActivityGoingRunning the ReaderThread is set to run and it will move into Reading mode where it will begin to read from the shared memory queue.

Additionally the Computation will be reset. This is done to force the computation to be in the correct state when entering Running. Without this call the computation may be part way through an accumulation when idled. Going running without this can cause buffer overflows.

void GoingRunning(StopToken st) override
{
    m_computation->Reset();
    m_reader->Run();
}
GoingIdle

In ActivityGoingIdle the Data Task is brought back to state Idle if in Running. It will also set the ReaderThread to Idle.

void ActivityGoingIdle(StopToken st) override
{
    m_reader->Idle();
}

There is no interaction with the Computation class in this activity.

Running

In ActivityRunning the BusinessLogic interfaces with both the ReaderThread and Computation. First it waits for the ReaderThread to signal that the read is complete (the correct number of data samples have been received and processed) then it signals the computation to begin.

The wait is done on the m_reader->WaitUntilComputationAllowed(); command. This will return when the reader is complete the read and is now skipping.

When the computation is done the m_reader->SignalComputationDone(); is invoked. This informs the ReaderThread the calculation is complete and it can now safely return to reading samples once it has completed the requested number of samples to skip.

The computation is launched with m_computation->Compute(); The computation will use the thread of activity Running.

After the computation has finished the returned results and statistics are written to the runtime_repo and to the OLDB respectively.

Updating

Updates dynamic configuration of the data task. Currently the only dynamic configuration attribute of the ReaderThread is the number of samples to skip. It is updated with m_reader->SetSamplesToSkip(GetParam<int32_t>(path));.

The computation may have many configuration points that can be modified when the Data Task is Operational. The example Computation provides the SetDynamicConfig function to reload from the runtime repo the new configuration. This cannot be performed when the calculation is running. m_computation->SetDynamicConfig();.

The guard method GuardUpdatingAllowed can be used to prevent a configuration update while a computation is currently ongoing:

bool IsUpdatingAllowed(Payload args) override
{
    return not m_is_computing;
}

ReaderThread Behaviour

In the figure below the protocol state machine of the ReaderThread class is shown.

../../_images/ReaderThread_Behaviour_479e67ce-8028-40ab-a110-762fd08d14d8.png

Fig. 3 ReaderThread protocol state machine.

The purple states show where callback functions, that can be registered by users, are executed.

When Reading the ReaderThread invokes the user provided Callback on every data sample that becomes available in the shared memory queue. This is done until the number of specified samples are read. It will then enter state Skipping where a defined number of samples are skipped. Typically a computation will be performed at this point. In case a computation takes longer than a skip cycle the reader will transition to Waiting to await the end of the computation without loss of information. If the computation still does not terminate the reader will start Dropping and finally transitions to state Error.

The ReaderThread is mainly runtime customisable via configuration data. At creation it must be created with the definition of the shared memory data topic as a template argument. See Shared Memory Queue Topic. The ReaderThread also requires the ipcq reader type as a template argument. This is required to be the same type as writer used and in the reference design this has been set to using ReaderType = ipcq::Reader<TopicType>;. See roadrunner documentation for more information.

Computation Behaviour

The figure below shows the protocol state machine of the Computation class used in the exampleDataTask. The state machine shows the possible API method invocation sequences and the resulting accumulation and computation cycles in a very simplistic way.

../../_images/Computation_Behaviour_883ada32-8704-4aea-94b9-70867e34d3a4.png

Fig. 4 Computation protocol state machine.

It is important to note that in the current implementation the Computation class performs the computation using the BusinessLogic thread. In the future, we aim to supply a more fleshed out computation class running in its own thread. This would give the flexibility to do some preprocessing on datapoints as they arrive rather then just a copy. An example of this is shown in the GPU data task, see GPU support. In this example, once the data is copied an asynchronous calculation is triggered on the GPU.

If the Data Task requires telemetry the only requirement is to provide a callback function to the ReaderThread to process data in the shared memory queue. This is registered via the method RegisterOnDataCallback.

See Creating a Custom Data Task for more information about customising the data task.

Configuration

Initial configuration for Data Task components is stored as for other components in a YAML file. The configuration file name has to correspond to the name of the component instance. The configuration contains both static and dynamic configuration items. Both static and dynamic configuration items are loaded during component initialisation. But only dynamic configuration items can be retrieved again at a later stage in the component life cycle e.g. using command Update.

The configuration can be divided into two groups:

  • ReaderThread configuration

  • Computation configuration

ReaderThread configuration

The ReaderThread configuration is shown below and is set via the BusinessLogic using the ReaderThread setters. This has been done to not force any specific layout of configuration files. Developers are free to design their own configuration and set the associated values from the BusinessLogic via the ReaderThread setter functions.

Required configuration:

cfg attribute

Type

Description

queue_name

RtcString

Name of the shared memory queue to read from

samples_to_read

RtcInt32

Number of samples to buffer

samples_to_skip

RtcInt32

Number of sample to skip

loop_frequency

RtcInt32

Expected loop frequency in Hz

Optional:

cfg attribute

Type

Description

thread_name

RtcString

Name of reader thread max 16 characters

cpu_affinity

RtcInt32

affinity of reader thread

error_margin

RtcFloat32

margin of error in timeouts

loop_frequency

RtcInt32

Expected loop frequency in Hz

This configuration is split on our example between two yaml files. common.yaml and data_task_1.yaml. The first file contains configuration that is common between multiple components while the seconds file contains only a specific data tasks configuration. The exampleDataTask yaml file contents is shown below. This is split between static and dynamic depending if it can change while the component is Operational (static cannot change, dynamic can). It is also split between locally common (i.e used by both ReaderThread and Computation class) and specific only used by the ReaderThread or the computation class.

static:
    common:
        samples_to_read:
            type: RtcInt32
            value: 5
    reader_thread:
        cpu_affinity:
            type: RtcInt32
            value: 0
dynamic:
    reader_thread:
        samples_to_skip:
            type: RtcInt32
            value: 10

Common

There are some values such as loop frequency and queue_name that will be common to multiple components. In our example, these have been put in the common.yaml file to give an example on how common datapoints may be handled. This is not enforced by the RTC Tk, instrument RTC developers are free to design the configuration layout themselves.

Below is an example of this common configuration can be handled and how we have chosen to handle it in the reference design components.

static:
    loop_1:
        queue_name:
            type: RtcString
            value: exampleDataTaskQueue
        frequency:
            type: RtcInt32
            value: 5
    wfs_1:
        subapertures:
            type: RtcInt32
            value: 4616
        slopes:
            type: RtcInt32
            value: 9232

Shared Memory Queue Topic

It is important to note that the ReaderThread is a template class and requires at compile time the topic definition of the data stored in the shared memory queue. This is also required by the telemetery subscriber which populates the shared memory queue.

Example topics used for the reference design can be found in the source code: _example/exampleTopics

For a SCAO system the topic definition may look as follows, though instrument RTC developers are free to design their topics as the instrument requires:

constexpr unsigned NUM_PIXELS = 800u*800u;
constexpr unsigned N_SUBAPS = 4616u;
constexpr unsigned N_COMMANDS = 6316u;

template<unsigned int NSUBAPS>
struct WfsLoopBaseTopic {
    std::array<float, 2*NSUBAPS> slopes;
    std::array<float, NSUBAPS> intensities;
};

struct ScaoLoopTopic {
    uint64_t sample_id;
    WfsLoopBaseTopic<N_SUBAPS> wfs;
    std::array<float, N_COMMANDS> commands;
};

Note

The topic structure must be a continuous flat structure in memory. It cannot have pointers to other memory blocks. This will not be readable in the Data Tasks and most likely cause segmentation violations or corruption. Any complex structure with pointers to other memory regions will have to be serialised into a flat topic structure.

Computation configuration

The Computation configuration is algorithm specific. An example computation is shown in the exampleDataTask, see Creating a Custom Data Task more details and specific computation configuration.

Errors

The majority of errors will be thrown by the ReaderThread.

ReaderThread

The main errors that can currently occur during component initialisation when it receives the Init command are due to missing mandatory configuration parameters. The following is an example of the reader failing to be created either due to wrong name or writer not existing.

[10:37:06:258][INFO ][data_task_1] ReaderThread::Spawn()
[10:37:07:458][ERROR][data_task_1] ActivityEnabling: failed, exception: Request 'Starting' timed out!
[10:37:07:459][ERROR][data_task_1]  exception info Exception in state 'On:NotOperational:Enabling ' with text 'Request 'Starting' timed out!'
[10:37:07:460][FATAL][client] Code:7 Message:Exception in state 'On:NotOperational:Enabling ' with text 'Request 'Starting' timed out!'
rtctkExampleDataTask.sh stterminate called after throwing an instance of 'std::system_error'
    what():  Timeout waiting for ipcq shared memory queue to be created: operation timed out

Here is an example of data not arriving quickly enough or no data flowing in the SHM and the ReaderThread times out.

[10:54:19:350][ERROR][data_task_1] Reading from shm timed out: check queue is being filled
[10:54:19:350][ERROR][data_task_1] Read: 0 in 200 ms
[10:54:19:350][ERROR][data_task_1] Expected: 5
[10:54:19:350][INFO ][data_task_1] ReaderThread::Work() - changed state to 'Error'
[10:54:19:350][ERROR][data_task_1] ReaderThread::Work() - operation timed out
[10:54:19:350][ERROR][data_task_1] ActivityRunning: failed, exception: An asynchronous error occured: 'operation timed out'!
[10:54:19:351][ERROR][data_task_1]  exception info Exception in state 'On:Operational:Running On:Operational:Update:Idle ' with text 'An asynchronous error occured: 'operation timed out'!'

Computation

Below is the most likely error to be seen by the computation class. This is caused by no slopes2modes matrix being present or the wrong name being present in configuration.

[11:02:14:950][ERROR][data_task_1] ActivityInitialising: failed, exception: Failed to open FITS file '/introot/dbarr/run/exampleDataTask/runtime_repo/data_task_1.dynamic.computation.slopes2modes1.fits'. could not open the named file
Source file: ../componentFramework/services/common/src/fitsIoFunctions.hpp
Line no.: 219
Function: ReadMatrixFromFits
[11:02:14:951][FATAL][client] Code:7 Message:Exception in state 'On:NotOperational:Initialising ' with text 'Failed to open FITS file '/introot/dbarr/run/exampleDataTask/runtime_repo/data_task_1.dynamic.computation.slopes2modes1.fits'. could not open the named file
Source file: ../componentFramework/services/common/src/fitsIoFunctions.hpp
Line no.: 219
Function: ReadMatrixFromFits'

Other Data Tasks

Not all Data Tasks will use the ReaderThread and perform periodic calculations. Other Data Tasks maybe required to perform on-demand task triggered by the user. To facilitate this, the following optional interfaces are available to expand the BusinessLogic.

  • ActivityOptimising()

  • ActivityMeasuring()

See RTC Component for more information about using these interfaces.

Limitations and Known Issues

Single ReaderThread

In the example shown in Creating a Custom Data Task only a single ReaderThread is configured. It is possible to attach more due to the callback implementation of the interface, but this would require a more complex business logic and computation class. While this should be possible with the tools provided, this has not been investigated prior to this release and more work in this regard would be needed.