Data Task¶
Overview¶
As described in the design document, Data Tasks are RTC Components that perform custom computations on data sets derived from Telemetry and/or Runtime Configuration data.
Due to the fact that input data, computation and output data are data task and instrument specific, it is not possible for the RTC Tk to deliver a one size fits all Data Task component that covers the entire spectrum of required calculation and that can be customised by only modifying configuration. Instrument RTC developers will have to get their hands dirty and implement the bulk of the Data Task code themselves.
To make the implementation of the concrete Data Task application easier, the RTC Tk provides
supporting libraries and a reference structure for Data Tasks. In this page the focus is the on the
ReaderThread
class. This is a key tool
that simplifies the accumulation of telemetry data from the shared memory queue. Developers will
then be able to focus on the algorithms being implemented rather then the infrastructure.
Prerequisites¶
Required Dependencies:
roadrunner - A library that provides numa wrapping and ipcq shared memory interface.
Optional Dependencies:
CUDA toolkit - tools for GPU acceleration and GPU drivers.
openblas - Math library
See GPU support for more information about GPU support.
Customisation¶
Custom Data Tasks are implemented by instrument RTC developers, they have to write the bulk of the code themselves but they are encouraged to make their life easier by re-using the provided support libraries.
The tutorials Creating a Simple RTC Component and Creating a Custom Data Task show concrete examples on how to implement custom RTC Components and Data Tasks from scratch.
The reusable Data Task libraries can be found in reusableComponents/dataTask
and the
examples of implementation can be found in the _examples/exampleDataTask
.
Reference Design¶
Due to the fact that Data Tasks can be very specific and varied, it is hard to demonstrate and to describe a generic Data Task. As such the description provided here is based around a reference design for a very simple telemetry-based Data Task.
Outline¶
The Data Task in the reference design is purposefully kept simple. The aim of this Data Task
is to read in telemetry from shared memory, extract the wavefront sensor slope vectors and copy
them into a local buffer. When the specified number of slopes have been buffered, the Data Task
will launch a computation. In the example it averages the slopes over the accumulation window and
then projects this average slope vector into modal space. The resultant average mode vector is
calculated using a matrix vector multiplication. These results are uploaded to the runtime_repo.
The cycle will then repeat as long as the Data Task remains in state Running
.
This very simple example shows the important features that will allow users to develop their own Data Tasks. It shows how to interface with the shared memory queue to accumulate telemetry data using the ReaderThread. It also gives an example of how to customise the BusinessLogic and how to interact with the Runtime Configuration Repository and the OLDB.
In the following sections an overview is given on how the separate parts of a data task can be put together. For more details on how to customise the Data Task see Creating a Custom Data Task.
Structure¶
In the figure below a high-level structural outline of the data task is
shown. The RTC Component life-cycle is implemented in the BusinessLogic.
It owns and controls both the ReaderThread
and the Computation
. The diagram also provides
a non-exhaustive overview of the most important API methods.
The BusinessLogic
is the glue that holds the Data Task together. It provides life-cycle methods
according to the states and state transitions of the RTC Component. By customising these
methods with calls to the ReaderThread and Computation class it is possible to create custom Data
Tasks.
The ReaderThread class is used to retrieve Telemetry Data from the
shared memory queue. It makes use of the roadrunner::ipcq
library. The class contains a simple
thread that runs a state machine that calls a user-provided callback method when a new data sample
is available to be processed.
The Computation
class provides methods of the concrete computation, here users will spend the
majority of their efforts implementing their specific the computation algorithm. The main focus is
on a callback for processing data from shared memory queue and on the computation that, in our
example is triggered after the ReaderThread indicated enough samples have been processed.
At this point no structure is forced on the user when developing the Computation class. They are free to design the class and interfaces with the BusinessLogic in the manner they feel is best.
Interaction¶
The figure below outlines the interaction of the different classes, along with other entities such as the Runtime Configuration Repository and the OLDB.
The Data Task
receives commands from other external entities and invokes the methods of the
BusinessLogic
class accordingly. This is part of the standard RTC Component functionality and
shown in a simplistic way in ComponentControl.
The BusinessLogic
interfaces with the ReaderThread
and Computation
classes invoking
methods controlling their state. This is shown in ReaderControl and CompuationControl.
The BusinessLogic also interfaces with the Runtime Configuration Repository and OLDB to allow loading of
configuration data and publication of statistics.
The ReaderThread
interfaces with the shared memory queue to accumulate telemetry data.
It also requires the registration of callback functions to provide the data copy to a local buffer
of the computation. This callback interface removes a direct dependency towards the Computation
class. The ReaderThread requires the callbacks to be registered via the BusinessLogic.
The ReaderThread has the following interfaces designed to be called from the BusinessLogic.
void Spawn()
void Join()
void Run()
void Idle()
void WaitUntilComputationAllowed()
void SignalComputationDone()
// a series of setters:
void SetQueueName(std::string name)
void SetThreadName(std::string name)
void SetCpuAffinity(int affinity)
void SetSamplesToRead(size_t value)
void SetSamplesToSkip(size_t value)
void SetLoopFrequency(size_t value)
void SetErrorMargin(float value)
void RegisterOnDataCallback(std::function callback)
void RegisterInitThreadCallback(std::function callback)
The Computation
class implements the concrete computation algorithms. It receives data from
the ReaderThread and returns results and statistics that are written to the Runtime Repository and
to the OLDB.
At the current time the exampleDataTask computation class has the following interfaces designed to be invoked by the BusinessLogic. These are not designed to be exhaustive and only invoke the logic to perform the example data task not a generic data task. This will be required to be extended in the future. See Computation Behaviour for the state machine used in the example.
void SetStaticConfig(/* args */)
void SetDynamicConfig(/* args */)
void OnDataAvailable(TopicType const& sample)
Result Compute()
void Reset()
Behaviour¶
Component Life-Cycle¶
The BusinessLogic
class is the glue that holds the Data Task together. It provides the
life-cycle methods that are called when commands are being received and it owns other objects
and the internal state information of the Data Task component.
Developers implement the life-cycle methods to provide custom behavior and to interact with
other objects owned by the BusinessLogic class. See RTC Component for more information.
Initialising¶
In ActivityInitialising
a Computation and ReaderThread instance is created passing the shared
memory topic definition as a template argument. Then the required callbacks are registered and
setters are invoked to configure both the ReaderThread and the Computation.
Enabling¶
In ActivityEnabling
the ReaderThread spawns its thread and enters its state machine.
void ActivityEnabling(StopToken st) override
{
m_reader->Spawn();
}
The spawn function checks that the ReaderThread has been configured with all required variables set and then spawns its activity thread, entering its state machine. See ReaderThread Behaviour.
There is no interaction with the Computation class in this activity.
Disabling¶
In ActivityDisabling
the BusinessLogic tells the ReaderThread to rejoin its activity thread.
The data task must be Operational for this to be invoked.
void ActivityDisabling(StopToken st) override
{
m_reader->Join();
}
There is no interaction with the Computation class in this activity.
GoingRunning¶
In ActivityGoingRunning
the ReaderThread is set to run and it will move into Reading mode where
it will begin to read from the shared memory queue.
Additionally the Computation will be reset. This is done to force the computation to be in the
correct state when entering Running
. Without this call the computation may be part way through
an accumulation when idled. Going running without this can cause buffer overflows.
void GoingRunning(StopToken st) override
{
m_computation->Reset();
m_reader->Run();
}
GoingIdle¶
In ActivityGoingIdle
the Data Task is brought back to state Idle if in Running. It will
also set the ReaderThread to Idle.
void ActivityGoingIdle(StopToken st) override
{
m_reader->Idle();
}
There is no interaction with the Computation class in this activity.
Running¶
In ActivityRunning
the BusinessLogic interfaces with both the ReaderThread and Computation.
First it waits for the ReaderThread to signal that the read is complete (the correct number
of data samples have been received and processed) then it signals the computation to begin.
The wait is done on the m_reader->WaitUntilComputationAllowed();
command. This will return
when the reader is complete the read and is now skipping.
When the computation is done the m_reader->SignalComputationDone();
is invoked. This informs
the ReaderThread the calculation is complete and it can now safely return to reading samples once
it has completed the requested number of samples to skip.
The computation is launched with m_computation->Compute();
The computation will use the
thread of activity Running.
After the computation has finished the returned results and statistics are written to the runtime_repo and to the OLDB respectively.
Updating¶
Updates dynamic configuration of the data task. Currently the only dynamic configuration attribute
of the ReaderThread is the number of samples to skip. It is updated with
m_reader->SetSamplesToSkip(GetParam<int32_t>(path));
.
The computation may have many configuration points that can be modified when the Data Task is
Operational. The example Computation provides the SetDynamicConfig function to reload from the
runtime repo the new configuration. This cannot be performed when the calculation is running.
m_computation->SetDynamicConfig();
.
The guard method GuardUpdatingAllowed
can be used to prevent a configuration update while a
computation is currently ongoing:
bool IsUpdatingAllowed(Payload args) override
{
return not m_is_computing;
}
ReaderThread Behaviour¶
In the figure below the protocol state machine of the ReaderThread class is shown.
The purple states show where callback functions, that can be registered by users, are executed.
When Reading
the ReaderThread invokes the user provided Callback on every data sample that
becomes available in the shared memory queue. This is done until the number of specified samples
are read. It will then enter state Skipping
where a defined number of samples are skipped.
Typically a computation will be performed at this point. In case a computation takes longer
than a skip cycle the reader will transition to Waiting
to await the end of the computation
without loss of information. If the computation still does not terminate the reader will start
Dropping
and finally transitions to state Error
.
The ReaderThread is mainly runtime customisable via configuration data. At creation it must be
created with the definition of the shared memory data topic as a template argument.
See Shared Memory Queue Topic. The ReaderThread also requires the ipcq reader type as a template argument.
This is required to be the same type as writer used and in the reference design this has been set
to using ReaderType = ipcq::Reader<TopicType>;
. See roadrunner documentation for more
information.
Computation Behaviour¶
The figure below shows the protocol state machine of the Computation class used in the exampleDataTask. The state machine shows the possible API method invocation sequences and the resulting accumulation and computation cycles in a very simplistic way.
It is important to note that in the current implementation the Computation class performs the computation using the BusinessLogic thread. In the future, we aim to supply a more fleshed out computation class running in its own thread. This would give the flexibility to do some preprocessing on datapoints as they arrive rather then just a copy. An example of this is shown in the GPU data task, see GPU support. In this example, once the data is copied an asynchronous calculation is triggered on the GPU.
If the Data Task requires telemetry the only requirement is to provide a callback function to the
ReaderThread to process data in the shared memory queue. This is registered via the method
RegisterOnDataCallback
.
See Creating a Custom Data Task for more information about customising the data task.
Configuration¶
Initial configuration for Data Task components is stored as for other components in a YAML file. The configuration file name has to correspond to the name of the component instance. The configuration contains both static and dynamic components. The static configuration can not be modified once the component has been started. These points cannot be updated using the Update command or a result produced during running. While the dynamic configuration can be updated or modified by the component itself as a result.
If static configuration is changed then the component should be restarted or reinitialized i.e. call Init.
The configuration can be divided into two groups:
ReaderThread configuration
Computation configuration
ReaderThread configuration¶
The ReaderThread configuration is shown below and is set via the BusinessLogic using the ReaderThread setters. This has been done to not force any specific layout of configuration files. Developers are free to design their own configuration and set the associated values from the BusinessLogic via the ReaderThread setter functions.
Required configuration:
cfg attribute |
Type |
Description |
---|---|---|
queue_name |
RtcString |
Name of the shared memory queue to read from |
samples_to_read |
RtcInt32 |
Number of samples to buffer |
samples_to_skip |
RtcInt32 |
Number of sample to skip |
loop_frequency |
RtcInt32 |
Expected loop frequency in Hz |
Optional:
cfg attribute |
Type |
Description |
---|---|---|
thread_name |
RtcString |
Name of reader thread max 16 characters |
cpu_affinity |
RtcInt32 |
affinity of reader thread |
error_margin |
RtcFloat32 |
margin of error in timeouts |
loop_frequency |
RtcInt32 |
Expected loop frequency in Hz |
This configuration is split on our example between two yaml files. common.yaml and data_task_1.yaml. The first file contains configuration that is common between multiple components while the seconds file contains only a specific data tasks configuration. The exampleDataTask yaml file contents is shown below. This is split between static and dynamic depending if it can change while the component is Operational (static cannot change, dynamic can). It is also split between locally common (i.e used by both ReaderThread and Computation class) and specific only used by the ReaderThread or the computation class.
static:
common:
samples_to_read:
type: RtcInt32
value: 5
reader_thread:
cpu_affinity:
type: RtcInt32
value: 0
dynamic:
reader_thread:
samples_to_skip:
type: RtcInt32
value: 10
Common¶
There are some values such as loop frequency and queue_name that will be common to multiple components. In our example, these have been put in the common.yaml file to give example of how common datapoints may be handled. This is not enforced by the RTC Tk the instruments are free to design the configuration layout themselves.
Below is an example of this common configuration can be handled and how we have chosen to handle it in the reference design components.
static:
loop_1:
queue_name:
type: RtcString
value: exampleDataTaskQueue
frequency:
type: RtcInt32
value: 5
wfs_1:
subapertures:
type: RtcInt32
value: 4616
slopes:
type: RtcInt32
value: 9232
Computation configuration¶
The Computation configuration is algorithm specific. An example computation is shown in the exampleDataTask, see Creating a Custom Data Task more details and specific computation configuration.
Errors¶
The majority of errors will be thrown by the ReaderThread.
ReaderThread¶
The main errors that can currently occur during component initialisation when it receives the Init command are due to missing mandatory configuration parameters. The following is an example of the reader failing to be created either due to wrong name or writer not existing.
[10:37:06:258][INFO ][data_task_1] ReaderThread::Spawn()
[10:37:07:458][ERROR][data_task_1] Activity.Enabling: failed, exception: Request 'Starting' timed out!
[10:37:07:459][ERROR][data_task_1] exception info Exception in state 'On.NotOperational.Enabling ' with text 'Request 'Starting' timed out!'
[10:37:07:460][FATAL][client] Code:7 Message:Exception in state 'On.NotOperational.Enabling ' with text 'Request 'Starting' timed out!'
rtctkExampleDataTask.sh stterminate called after throwing an instance of 'std::system_error'
what(): Timeout waiting for ipcq shared memory queue to be created: operation timed out
Here is an example of data not arriving quickly enough or no data flowing in the SHM and the ReaderThread times out.
[10:54:19:350][ERROR][data_task_1] Reading from shm timed out: check queue is being filled
[10:54:19:350][ERROR][data_task_1] Read: 0 in 200 ms
[10:54:19:350][ERROR][data_task_1] Expected: 5
[10:54:19:350][INFO ][data_task_1] ReaderThread::Work() - changed state to 'Error'
[10:54:19:350][ERROR][data_task_1] ReaderThread::Work() - operation timed out
[10:54:19:350][ERROR][data_task_1] Activity.Running: failed, exception: An asynchronous error occured: 'operation timed out'!
[10:54:19:351][ERROR][data_task_1] exception info Exception in state 'On.Operational.Running On.Operational.Update.Idle ' with text 'An asynchronous error occured: 'operation timed out'!'
Computation¶
Below is the most likely error to be seen by the computation class. This is caused by no slopes2modes matrix being present or the wrong name being present in configuration.
[11:02:14:950][ERROR][data_task_1] Activity.Initialising: failed, exception: Failed to open FITS file '/introot/dbarr/run/exampleDataTask/runtime_repo/data_task_1.dynamic.computation.slopes2modes1.fits'. could not open the named file
Source file: ../componentFramework/services/common/src/fitsIoFunctions.hpp
Line no.: 219
Function: ReadMatrixFromFits
[11:02:14:951][FATAL][client] Code:7 Message:Exception in state 'On.NotOperational.Initialising ' with text 'Failed to open FITS file '/introot/dbarr/run/exampleDataTask/runtime_repo/data_task_1.dynamic.computation.slopes2modes1.fits'. could not open the named file
Source file: ../componentFramework/services/common/src/fitsIoFunctions.hpp
Line no.: 219
Function: ReadMatrixFromFits'
Other Data Tasks¶
Not all Data Tasks will use the ReaderThread and perform periodic calculations. Other Data Tasks maybe required to perform on-demand task triggered by the user. to facilitate this the following optional interfaces are available to expand the BusinessLogic.
ActivityOptimising()
ActivityMeasuring()
See RTC Component for more information about using these interfaces.
Limitations and Known Issues¶
Single ReaderThread¶
In the example shown in Creating a Custom Data Task only a single ReaderThread is configured. It is possible to attach more due to the callback implementation of the interface, but this would require a more complex business logic and computation class. While this should be possible with the tools provided, this has not been investigated prior to this release and more work in this regard would be needed.