2. MAL API¶
Revision: |
2.3 |
---|---|
Status: |
Released |
Repository: |
|
Project: |
ELT CII |
Folder: |
/trunk/deliverables/phase2 |
Document ID: |
CSL-MAN-17-150198 |
File: |
MAN-MAL_API.docx |
Owner: |
Matej Šekoranja, Cosylab Sweden |
Last modification: |
April 21, 2022 |
Created: |
October 25, 2017 |
Prepared by |
Reviewed by |
Approved by |
---|---|---|
Matej Šekoranja, CSL SWE |
Aljaž Podboršek Gregor Čuk |
Gregor Čuk |
Revision |
Date |
Changed/rev iewed |
Section(s) |
Modificatio n |
---|---|---|---|---|
0.1 |
2017-10-25 |
msekoranja |
All |
Created. |
1.0 |
2017-11-06 |
msekoranja |
All |
Update on API, release. |
1.1 |
2018-02-23 |
msekoranja |
3.1, 3.3 |
Update on API changes, C++ MrvSubscrib er example. |
2.0 |
2019-02-22 |
msekoranja |
All |
Complete rewrite of a document. |
2.1 |
2019-03-04 |
msekoranja |
All |
Example code revised. |
2.2 |
2022-03-31 |
jpribosek |
2.5.2 |
Example code revised. |
2.3 |
2022-04-21 |
dkumar |
2.6.2 |
Exception handling details |
Confidentiality
This document is classified as a confidential document. As such, it or parts thereof must not be made accessible to anyone not listed in the Audience section, neither in electronic nor in any other form.
Scope
This document is a Middleware Abstraction Layer user manual document for the ELT Core Integration Infrastructure Software project.
Audience
Users and Maintainers of the ELT Core Integration Infrastructure Software.
Glossary of Terms
API |
Application Programming Interface |
---|---|
MAL |
Middleware Abstraction Layer |
QoS |
Quality of Service |
CII |
Core Integration Infrastructure |
ICD |
Interface Control Document |
XML |
Extensible Markup Language |
References
Cosylab, Middleware Abstraction Layer Design document, CSL-DOC-17-147260 v1.5
Cosylab, ELT CII MAL Transfer document, CSL-DOC-18-168015, version 1.8
Cosylab, Data Addressing Specification, CSL-DOC-17-147264, version 1.2
2.1. Overview¶
This document is a user manual for MAL API. The document provides descriptions of the API and instructions to programmers how to use MAL API. All the details on the API and its functionality are covered by the MAL design document [1].
2.2. Installation¶
See ELT CII MAL Transfer document [2], Chapter 1.2 on how to build and install MAL.
2.3. Introduction¶
MAL is an abstraction API that provides complete independence from the communication middleware software. This is achieved by dynamically loadable MAL middleware implementation (mapping) that implements MAL API. In addition, data entities (classes, structures, interfaces) need to be independent from the communication-middleware too. This is achieved by using generation: user defines its data entities in ICD definition language (i.e. XML), the generator then produces middleware-agnostic entities that are used by the applications and middleware-specific code that needs to be generated. The specific code is dynamically loaded by the MAL mapping implementations when needed. This part is completely hidden from the programmer. The programmer must never use or reference any middle-ware specific code it only uses pure MAL API and agnostic data entities.
Middleware selection is done using Uniform Resource Identifier (URI) addressing. A URI is a string of characters that unambiguously identifies a particular resource. The scheme part of the URI (e.g. “dds.ps://” prefix) determines what MAL mapping to use, whereas the rest of the URI is completely middleware specific. For more info on the URIs please refer to the Data Addressing Specification ([3]).
MAL API supports two standardized communication patterns: data centric publish-subscribe (decoupled peers based on data publishers and subscribers) and request-response (coupled peer-to-peer client-server communication).
The following sections provide the description and examples of the API for both communication patterns. The examples demonstrate usage of the API, they do not do anything meaningful. Source code of the examples is available in elt-mal/mal module.
2.4. CiiFactory¶
CiiFactory is an entry-point class of the MAL API. Usually it is accessed as a singleton instance via getInstance() method. This provided a shared, thread-safe instance to be used among the threads in one process. However, in case of a more complex application where an insulation among different “sub-processes” is required, the API provides a createInstance() method that instantiates a completely new CiiFactory instance on every call of this method. Avoid using this method unless there is a good reason for it.
A CiiFactory instance has 2 groups of methods: methods for MAL mapping implementation (un-)registration and factory methods of entities (publisher, subscriber; client, server) for both communication patterns. The main task of CiiFactory is to delegate factory method calls to the appropriate registered MAL mapping implementation depending on the URI, given as a parameter to all the factory methods. CiiFactory is instantiated without any MAL mapping registered. A programmer needs to register MAL mappings. The registration is done by calling registerMal method that takes 2 parameters: a reference to the instance of MAL mapping implementation and an URI schema string to be handled by the implementation. CiiFactory takes no responsibilities on registered mappings lifecycle; it is responsibility of the programmer to take care of it, still CiiFactory provides a close() method that calls close() on all registered MAL mappings and unregisters them. A programmer has also an ability to unregister previously registered MAL mappings. Un-registration only removes a MAL instance from CiiFactory dictionary; any previously created entities (e.g. publishers) will remain active. The entities lifecycle is responsibility of the MAL mapping implementation, meaning releasing a MAL mapping implementation also releases all its entities.
Registration of a MAL mapping introduces a compile and runtime dependency to the mapping. CiiFactory.loadMal method dynamically loads the mapping and avoids any dependency on any MAL mapping. CiiFactory.installMal calls CiiFactory.loadMal and registers newly loaded mapping to the shared CiiFactory instance. Optional properties parameter can be passed as parameter to configure loaded MAL. Both methods accept MAL mapping (schema) name, e.g.: dds, zpb, opcua.
All factory methods accept 3 common parameters: a URI, a quality-of-service (QoS) parameters and a string name-value list of MAL mapping specific properties. MAL API defines a common set of QoS parameters per communication pattern (described in detail in the following sections). If none (null) are supplied defaults are used.
The list of MAL mapping specific properties allows specific/fine tuning of the middleware in a uniform way. The properties in a list are specific and documented per MAL mapping implementation, usually they are provided via some kind of configuration subsystem. If none (null) are supplied defaults are used.
A registration of another MAL instance with the same schema string will override any previous registration (un-registration no longer needed). Un-registration of non-registered scheme results in no operation action. A call to a factory method with a URI scheme that is not registered will result in SchemeNotSupportedException exception.
The following code examples show recommended initialization of MAL mapping with creation of a publisher.
2.4.1. Java¶
In Java it is highly recommended to use try-with-resources statements so that the code is safe and close() on the resource will be always called.
// install DDS MAL mapping
CiiFactory.installMal("dds");
// use try-with-resources statement to be safe that close()
// is always being called on factory and MAL closed
try (CiiFactory factory = CiiFactory.getInstance()) {
// create URI
URI uri = URI.create("dds.ps:///m1/CabinetTelemetry");
// create publisher instance with default QoS and no specific properties
// use try-with-resources statement to be safe that close()
// is always being called on publisher
try (Publisher<Sample> publisher =
factory.getPublisher(uri, QoS.DEFAULT, null, Sample.class)) {
// put publish code here ...
}
}
2.4.2. C++¶
C++ implementation uses RAII idiom using smart pointers. Resources are closed automatically when their enclosing scope ends. To close resource explicitly call close() method.
try {
// load DDS MAL mapping with (optional) properties
std::shared_ptr<::elt::mal::Mal> ddsMal = mal::loadMal("dds", mal::Mal::Properties{
{"dds.domain", "100"}});
::elt::mal::CiiFactory &factory = mal::CiiFactory::getInstance();
// register DDS MAL mapping with CiiFactory
factory.registerMal("dds", ddsMal);
// create URI
mal::Uri uri("dds.ps:///m1/CabinetTelemetry");
// create publisher instance with default QoS and no specific properties
std::unique_ptr<ps::Publisher<mal::example::Sample>> publisher = factory.getPublisher<mal::example::Sample>(
uri,
mal::ps::qos::QoS::DEFAULT,
{});
// put publisher code here...
} catch (std::exception& exc) {
std::cerr << "EXCEPTION: " << exc.what() << std::endl;
}
2.4.3. Python¶
In Python all generated entities are prefixed with a “Mod”<name of the ICD file in camelCase> string. The module also loads appropriate C++ binding classes and libraries. Submodule structure follows ICD package structure with capitalized name for each package.
Python MAL API supports context manager protocol to automatically release the resources after use. Closing resources explicitly with close() is also supported.
# Import MAL API.
import elt.pymal as mal
# Import Data Entity class from binding module.
from ModManualExamples.Elt.Mal import Example
uri = 'dds.ps:///m1/CabinetTelemetry'
factory = CiiFactory.getInstance()
# Load DDS MAL mapping.
ddsMal = mal.loadMal("dds", {'dds.domain': '100'})
factory.registerMal('dds', ddsMal)
# Use context manager interface of the publisher
# to ensure underlying instance of the publisher is
# closed when context is destroyed.
# Create publisher instance with default QoS and no
# specific properties.
with factory.getPublisher(uri,
Example.Sample,
qos=mal.ps.qos.DEFAULT) as publisher:
# Put publish code here...
# Publisher is automatically closed at this point.
2.5. Publish-subscribe¶
Publish-subscribe is a data-centric decoupled communication pattern. Publishers (data producers) publish data to a topic, and subscribers (data consumers) subscribe to the topics they are interested in to receive the data published. This is not a coupled (connected) peer-to-peer communication – there can be many subscribers per one topic. MAL prescribes a limit of only one publisher per topic. Order of instantiation/startup also does not matter. Subscriber will miss any data sent to the channel prior their subscription. Subscribers can subscribe to a topic even if there is no publisher available. Once it became available the subscribers will start receiving the newly published data. The paradigm does not guarantee that the first n-messages are will be received by the subscribers, even if the subscribers are already running when the publisher starts. It takes some time to establish communication mechanism on startup of the publishers and subscribers leading to non-instant subscriptions to the topics and consequently missed data. In addition, MAL does not guarantee reliable communication (and queueing) – the data might be lost during the transmission or dropped if send/receive queues are full. MAL mapping specific properties can be used to set reliability QoS, if underlying communication middleware supports it.
MAL however does support 2 QoS settings:
Deadline
The deadline QoS shall be the maximum age of a sample, i.e. t(now) – t(sample) < deadline_time, sample time is a time of creation of a sample. The sample time is given as parameter to the publish method call on publisher. If not given, current time is used. Note that this also implies that if a sample is left too long in the subscribers receive queue it might expire (and is removed from the queue).
If deadline QoS is not defined, it is assumed to be infinite.
Latency
The latency QoS shall be the maximum time a sample may remain in-transit between the publisher and subscriber. That is, the arrival time minus sample time cannot be greater than latency QoS. If latency QoS is not defined, it is assumed to be infinite.
If QoS is violated a corresponding sample is discarded.
It is assumed that time among different network nodes where publishers and subscribers run is synchronized.
MAL also supports a concept of instances. Every structure (defined in ICD) can be assigned a key. A key is a set of structure fields (of primitive types) that uniquely identifies one instance. This allows programmers to publish/retrieve/filter data based on their instance.
To create data entities use createDataEntity methods available on both, publisher and subscriber.
2.5.1. Publisher¶
Publisher has only one task – to publish data within one topic. There are 2 publish methods available: one with and one without sample timestamp. In case of method without a source timestamp current time is used as a sample timestamp.
Timestamp is a double value representing seconds past 1.1.1970 UTC (to micro precision). There are utility classes that help conversion from/to standardized timestamp values/structures.
Publish methods accept timeout parameter. The method blocks until the message is sent. If the method execution time exceeds timeout time limit TimeoutException is thrown. A non-blocking behavior can be achieved by using timeout value of zero.
Depending on whether data structure contains a key there are two ways of publishing, as shown on the following examples.
This class is thread-safe.
2.5.1.1. Java¶
// install DDS MAL mapping
CiiFactory.installMal("dds");
try (CiiFactory factory = CiiFactory.getInstance()) {
URI uri = URI.create("dds.ps:///m1/CabinetTelemetry");
// create publisher with deadline and latency QoS
try (Publisher<Sample> publisher =
factory.getPublisher(uri,
new QoS[] {
new Deadline(1, TimeUnit.SECONDS),
new Latency(100, TimeUnit.MILLISECONDS)
}, null, Sample.class)) {
//
// publish code for keyless topic
// and user provided source timestamp
//
Sample sample = publisher.createDataEntity();
sample.setDaqId(0);
sample.setValue(12.8);
// MalUtil.timestampToDouble(long millis) from mal-common
// might be helpful to convert Java timestamp from millis
double timestamp = 1550791384.1d;
publisher.publish(timestamp, sample,
DEFAULT_TIMEOUT_SEC, TimeUnit.SECONDS);
//
// publish code for keyed topic
//
Sample keySample = publisher.createDataEntity();
// set key field(s)
keySample.setDaqId(1);
try (InstancePublisher<Sample> daq1Publisher =
publisher.createInstancePublisher(keySample)) {
Sample daq1Sample = daq1Publisher.createDataEntity();
// key is already set when createDataEntity is
// called on InstancePublisher
daq1Sample.setValue(Math.PI);
daq1Publisher.publish(daq1Sample, DEFAULT_TIMEOUT_SEC, TimeUnit.SECONDS);
}
// daq1Publisher is destroyed here,
// since we are out of try-with-resource scope
}
}
}
2.5.1.2. C++¶
try {
std::shared_ptr<::elt::mal::Mal> ddsMal = mal::loadMal("dds", mal::Mal::Properties{
{"dds.domain", "100"}});
::elt::mal::CiiFactory &factory = mal::CiiFactory::getInstance();
factory.registerMal("dds", ddsMal);
mal::Uri uri("dds.ps:///m1/CabinetTelemetry");
// Create publisher with deadline and latency QoS
std::unique_ptr<ps::Publisher<mal::example::Sample>> publisher = factory.getPublisher<mal::example::Sample>(
uri,
{ std::make_shared<mal::ps::qos::Latency>(
std::chrono::milliseconds(100)),
std::make_shared<mal::ps::qos::Deadline>(
std::chrono::seconds(1)) }, {});
//
// publish code for keyless topic
// and user provided source timestamp
//
std::shared_ptr<mal::example::Sample> sample = publisher->createDataEntity();
sample->setDaqId(0);
sample->setValue(12.8);
double timestamp = 1550791384.1;
publisher->publish(timestamp, *sample, std::chrono::seconds(3));
//
// publish code for keyed topic
//
std::shared_ptr<mal::example::Sample> keySample = publisher->createDataEntity();
// set key field(s)
keySample->setDaqId(1);
{
std::unique_ptr<::elt::mal::ps::InstancePublisher<mal::example::Sample>> daq1Publisher =
publisher->createInstancePublisher(*keySample);
std::shared_ptr<mal::example::Sample> daq1Sample = daq1Publisher->createDataEntity();
// daqId is already set as it is created by InstancePublisher
daq1Sample->setValue(3);
daq1Publisher->publish(*daq1Sample, std::chrono::seconds(3));
}
// daq1Publisher is destroyed here, since its enclosing scope ended
} catch (std::exception& exc) {
std::cerr << "EXCEPTION: " << exc.what() << std::endl;
}
2.5.1.3. Python¶
Python does not support method overloading. Publisher provides publish() and publishWithTimestamp() methods. publishWithTimestamp() takes timestamp value as first argument. Type of the timestamp value must be float (as returned from time.time()).
Durations and timeouts must be specified as instances of standard datetime.timedelta class.
uri = 'dds.ps:///m1/CabinetTelemetry'
ddsMal = mal.loadMal('dds', {'dds.domain': '100'})
factory = mal.CiiFactory.getInstance()
factory.registerMal('dds', ddsMal)
# Create publisher with deadline and latency QoS.
qosList = [mal.ps.qos.Deadline(datetime.timedelta(seconds=1)),
mal.ps.qos.Latency(datetime.timedelta(milliseconds=100))]
with factory.getPublisher(uri, Example.Sample,
qos=qosList) as publisher:
#
# Publish code for keyless topic
# and user provided source timestamp.
#
sample = publisher.createDataEntity()
sample.setDaqId(0)
sample.setValue(12.8)
timestamp = time.time()
publisher.publishWithTimestamp(sample,
timestamp,
datetime.timedelta(seconds=1))
#
# Publish code for keyed topic.
#
keySample = publisher.createDataEntity()
# Set key field(s).
keySample.setDaqId(1)
# Use Publisher as context manager.
with publisher.createInstancePublisher(keySample) as daq1Publisher:
daq1Sample = daq1Publisher.createDataEntity()
daq1Sample.setValue(3.14)
daq1Publisher.publish(daq1Sample, DEFAULT_TIMEOUT)
# daq1Publisher is closed here.
2.5.2. Subscriber¶
Each subscriber instance has its own queue. The queue holds the events related to the subscribed topic ordered by the time of insertion into the queue (e.g. data arrival). The events are usually data events, they simply contain the data published, however there are also events that indicate e.g. that there is no publisher for given instance available. The events are represented using DataEvent interface.
The interface via its methods provides access the data (if available), its source timestamp, InstanceState and InstanceEvent enumerations. InstanceState always reports current (not the event was recorded) state of an instance: ALIVE or NOT_ALIVE. InstanceEvent describes the reason why event was created: CREATED (instance has been created/revived), UPDATED (updated data event), REMOVED (instance has been destroyed, i.e. no more publishers available for the instance). A programmer should never assume that data is always available, e.g. in case of REMOVED InstanceEvent there is no data. A programmer should always call hasValidData method to check whether the event holds (valid) data.
NOTE: REMOVED instance event is not guaranteed to be generated when instance is being destroyed (due to differences, or lack of notification, in middleware communication software), therefore software should not rely its logic on arrival of this event.
All the data received by subscriber instance (registered to one topic) is first processed by subscriptions on the subscriber. If any of the data matches a subscription a subscription callback is called and the data is provided in the callback, otherwise the data gets into the queue. A programmer has to take care of emptying the queue, otherwise it might get full and no new data can be received. In order to empty queue user need to invoke one of the read methods, or set deadline QoS property to instruct removal of (too) old data from the queue.
NOTE: You should never close the subscription within the subscription callback as that may cause deadlock, depending on the implementation. If you want your callback to stop processing but you don’t want to close the subscription yet, you should pass in an atomic variable and early return from the callback depending on the state of the variable.
The following text describes some of the use-cases shown in the example code below:
Read data from all instances. Maximum number of data is limited by maxSamples parameter. To read all the data (and clear the queue) use 0 as maxSamples parameter.
If there is no data, an empty (not null) list is returned. This method creates new entities and copies their data; avoid using this method when dealing with large arrays.
Read data from one instance. The instance to be read is given as parameter. To create an instance parameter call createDataEntity and set fields that form a key. If queue for specified instance is empty a null reference is returned. This method creates a new instance (there is an exception in case of C++ by-reference variation the method) and copies its data (in all method variations); avoid using it when dealing with large arrays.
Whenever there is a need to avoid any copies of the data (e.g. large arrays) there a load read methods available. If underlying middleware and mapping implementation support no-copy behavior this methods “loan” the data from the middleware and pass the reference to the data to the programmer. However, this is not always possible. If this is not possible (not supported by the implementation) a copy is made. Also be careful that some middleware might support this only for certain data types: if mapping from middleware data type matches MAL data type and no conversion is needed, then no-copy behavior is possible, otherwise a copy (during conversion) is made.
Synchronous read of events (not just data!) can be done using readDataEvents method. The method returns a count of added events to the list provided as an argument. The method blocks until one of the condition is met: number of events added to the list reaches eventCountLimit or collectTime time is passed. Zero eventCountLimit imposes no limit on number of events added, and zero collectTime turns method to non-blocking and only reads what is currently available in the queue (still limited by eventCountLimit count limit). Do not forget that events might not always data event, check by calling hasValidData method if data is valid first.
For asynchronous subscriptions there are 2 methods: one that provides the data in the callback and the one without. The former method guarantees only that data is valid for the time of the callback. The implementation might avoid doing a copy in this case. The later method only notifies about the presence of new data, the data can be read by using one of synchronous read methods later. However, read call must not be invoke inside the callback. If you want to get the data, use the first method. The callback methods are being executed by some MAL mapping implementation or even middleware threads. Creating asynchronous subscription returns a Subscription instance that is used to cancel the subscription.
Lifetime of subscriptions is limited by the lifetime of a subscriber. If subscriber is being closed the subscriptions are also closed.
In addition there is a method that allows event-pump based subscription. Once subscription is being instantiated a pool method needs to be called in order to get the callback invoked. The callback is being invoked by the same thread that calls poll.
Subscriptions and readDataEvents methods support filtering. Filtering can be done on instance, InstanceState and InstanceEvent fields. Setting a filter on a field means selecting/limiting event to set value. Filter on multiple fields implies all fields must match their set value. Default filter allows all the events. A cached (shared) instance it accessed via DataEventFilter::all() method.
This class is thread-safe.
2.5.2.1. Java¶
public class SubscriberExample {
// user data event callback function
static void dataEventFunction(Subscriber<Sample> subscriber, DataEvent<Sample> event) {
// NOTE: you should never close the subscription within the callback
// as that may cause deadlock.
if (event.hasValidData()) {
System.out.println(event.getData());
}
}
public static void main(String[] args) {
// install DDS MAL mapping
CiiFactory.installMal("dds");
try (CiiFactory factory = CiiFactory.getInstance()) {
URI uri = URI.create("dds.ps:///m1/CabinetTelemetry");
try (Subscriber<Sample> subscriber =
factory.getSubscriber(uri, QoS.DEFAULT, null, Sample.class)) {
//
// Read all alive instances w/o sample count limit.
//
List<Sample> allAliveInstances = subscriber.read(0);
//
// Read one instance, i.e. daqId == 1.
//
Sample keySample = subscriber.createDataEntity();
keySample.setDaqId(1);
Sample daq1Sample = subscriber.readInstance(keySample);
//
// Loaned (no-copy) example.
// Loan is returned when close() method is called on LoanedDataEntity
// or automatically by try-with-resources (as shown below).
//
try (LoanedDataEntity<Sample> loanedSample = subscriber.loanedRead()) {
Sample sampleData = loanedSample.getData();
if (sampleData != null) {
// use data here
}
}
//
// Read data events, filtered by instance and instance state.
// Block until maxEvents (1000) are read or timeout (3s) occurs.
//
final int maxEvents = 1000;
List<DataEvent<Sample>> events = new ArrayList<DataEvent<Sample>>(maxEvents);
DataEventFilter<Sample> filter = new DataEventFilter<Sample>();
filter.setInstance(keySample);
filter.setInstanceState(InstanceState.ALIVE);
filter.setInstanceEvents(
EnumSet.of(InstanceEvent.CREATED, InstanceEvent.UPDATED)
);
int samplesRead = subscriber.readDataEvents(filter, events,
maxEvents, 3, TimeUnit.SECONDS);
//
// Asynchronous data subscription, no filtering.
// 1 second sleep added to simulate some work.
//
try (Subscription subs = subscriber.subscribeAsync(DataEventFilter.all(),
SubscriberExample::dataEventFunction)) {
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
} catch (InterruptedException ie) {
// noop
}
// note that subscription is destroyed when leaving try-with-resources scope.
//
// Asynchronous notification subscription, no filtering,
// lambda callback implementation.
// DO NOT CALL read methods in the callback!
// 1 second sleep added to simulate some work.
//
try (Subscription subs =
subscriber.subscribeAsync(DataEventFilter.all(),
(Subscriber<Sample> sub) -> System.out.println("new data"))
) {
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
} catch (InterruptedException ie) {
// noop
}
//
// Synchronous (aka event pump) subscription, no filtering.
// Poll with 1000 events / 1 second limit, endless loop.
//
try (Subscription subs = subscriber.subscribe(
DataEventFilter.all(),
SubscriberExample::dataEventFunction)) {
while (true) {
subscriber.poll(0, 1, TimeUnit.SECONDS);
// the call above blocks for max. 1 second or
// until 1000 events are provided to the callback
}
}
}
}
}
2.5.2.2. C++¶
std::shared_ptr<::elt::mal::Mal> ddsMal = mal::loadMal("dds", mal::Mal::Properties{
{"dds.domain", "100"}});
::elt::mal::CiiFactory &factory = mal::CiiFactory::getInstance();
factory.registerMal("dds", ddsMal);
mal::Uri uri("dds.ps:///m1/CabinetTelemetry");
try {
std::unique_ptr<ps::Subscriber<mal::example::Sample>> subscriber = factory.getSubscriber<mal::example::Sample>(
uri, mal::ps::qos::QoS::DEFAULT, {});
//
// Read all alive instances w/o sample count limit.
//
std::vector<std::shared_ptr<mal::example::Sample>> allAliveInstances = subscriber->read(0);
//
// Read one instance, i.e. daqId == 1.
//
std::shared_ptr<mal::example::Sample> keySample = subscriber->createDataEntity();
keySample->setDaqId(1);
std::shared_ptr<mal::example::Sample> daq1Sample = subscriber->readInstance(*keySample);
// ... or by providing sample by reference
std::shared_ptr<mal::example::Sample> sample = subscriber->createDataEntity();
bool sampleRead = subscriber->readInstance(*keySample, *sample);
//
// Loaned (no-copy) example.
// Loan is returned when close() method is called on LoanedDataEntity
// or automatically when scope containing loaned data entity
// terminates.
{
std::shared_ptr<::elt::mal::ps::LoanedDataEntity<mal::example::Sample>> loanedSample = subscriber->loanedRead();
if (loanedSample) {
mal::example::Sample &sampleData = loanedSample->getData();
// use data here
}
}
//
// Read data events, filtered by instance and instance state.
// Block until maxEvents are read or timeout occurs.
//
using Sample = mal::example::Sample;
std::vector<std::shared_ptr<mal::ps::DataEvent<Sample>>> events(1000);
mal::ps::DataEventFilter<Sample> filter;
filter.setInstance(std::shared_ptr<Sample>(std::move(keySample)));
filter.setInstanceState(mal::ps::InstanceState::ALIVE);
filter.setInstanceEvents(
std::set<mal::ps::InstanceEvent>{mal::ps::InstanceEvent::CREATED,
mal::ps::InstanceEvent::UPDATED});
std::size_t samplesRead = subscriber->readDataEvents(
filter,
events, events.capacity(),
std::chrono::seconds(3));
//
// Synchronous (aka event pump) subscription, no filtering.
// Poll for 1 second w/ no event count limit.
//
{
std::unique_ptr<mal::ps::Subscription> subscription = subscriber->subscribe(
mal::ps::DataEventFilter<Sample>::all(),
[](mal::ps::Subscriber<Sample>& subscriber,
const mal::ps::DataEvent<Sample>& event) -> void {
// NOTE: you should never close subscription from
// within a callback, since that may cause deadlock
std::cout << event.getData() << std::endl;
});
subscriber->poll(10, std::chrono::seconds(1));
}
// Note that subscription is destroyed when leaving the scope.
//
// Asynchronous subscription, no filtering.
// 1 second sleep added to simulate some work.
//
{
std::unique_ptr<mal::ps::Subscription> subscription = subscriber->subscribeAsync(
mal::ps::DataEventFilter<Sample>::all(),
[](mal::ps::Subscriber<Sample>& subscriber,
const mal::ps::DataEvent<Sample>& event) -> void {
// NOTE: you should never close subscription from
// within a callback, since that may cause deadlock
std::cout << event.getData() << std::endl;
});
std::this_thread::sleep_for(std::chrono::seconds(1));
}
//
// Asynchronous notification subscription, no filtering.
// In callback sync read is called to read all alive samples.
//
{
std::promise<void> eventPromise;
std::future<void> eventFuture = eventPromise.get_future();
std::unique_ptr<mal::ps::Subscription> subscription = subscriber->subscribeAsync(
mal::ps::DataEventFilter<Sample>::all(),
[&eventPromise](mal::ps::Subscriber<Sample>& subscriber) -> void {
// NOTE: you should never close subscription from
// within a callback, since that may cause deadlock
eventPromise.set_value();
});
std::future_status status = eventFuture.wait_for(std::chrono::seconds(1));
if (status == std::future_status::ready) {
std::cout << subscriber->read(0).size() << std::endl;
} else {
std::cout << "No event was received\n";
}
}
} catch (std::exception& exc) {
std::cerr << "EXCEPTION: " << exc.what() << std::endl;
}
2.5.2.3. Python¶
Python subscriber offers (as described above) three versions of the subscribe method:
subscribe()
Creates synchronous subscription. Received data events are passed to provided callback method via poll() call on the subscriber object synchronously.
subscribeAsync()
Creates asynchronous subscription. Provided callback method is called from another thread as new data events arrive.
subscribeAsyncNotifier()
Creates asynchronous subscription. Callback method receives change notifications. Method is called from another thread.
When creating data event filter, data entity class (not instance) must be provided as argument to factory function mal.ps.DataEventFilter.all().
DEFAULT_TIMEOUT = datetime.timedelta(seconds=3)
# Poll interval in seconds.
POLL_INTERVAL = datetime.timedelta(seconds=1)
uri = 'dds.ps:///m1/CabinetTelemetry'
ddsMal = mal.loadMal('dds', {'dds.domain': '100'})
factory = mal.CiiFactory.getInstance()
factory.registerMal('dds', ddsMal)
with factory.getSubscriber(uri,
Example.Sample,
qos=mal.ps.qos.DEFAULT) as subscriber:
# Read all alive instances w/o sample count limit.
allAliveInstances = subscriber.read(0)
# Read one instance i.e. daqId == 1.
keySample = subscriber.createDataEntity()
keySample.setDaqId(1)
daq1sample = subscriber.readInstance(keySample)
# Loaned (no-copy) example.
# Loan is returned when close() method is
# called on LoanedDataEntity. Note: there is
# no context management protocol available for
# loaned samples. Use try, finally block.
loanedSample = subscriber.loanedRead()
try:
sampleData = loanedSample.getData()
finally:
# Return loan
loanedSample.close()
# Read data events, filtered by instance and instance state.
# Block until maxEvents (1000) are read or timeout (3s) occurs.
maxEvents = 1000
# Create new filter for Sample data entity.
filter = mal.ps.DataEventFilter.all(Sample)
filter.setInstance(keySample)
filter.setInstanceState(mal.ps.InstanceState.ALIVE)
filter.setInstanceEvents(set(mal.ps.InstanceEvent.CREATED,
mal.ps.InstanceEvent.UPDATED))
# List of samples is returned.
samples = subscriber.readDataEvents(filter,
maxEvents,
timeout=DEFAULT_TIMEOUT)
# Asynchronous data subscription, no filtering,
# 1 second sleep added to simulate some work.
def dataEventFunction(subscription, dataEvent):
# NOTE: you should never close subscription from within
# a callback, since that may cause a deadlock
# Data event processing method
if dataEvent.hasValidData():
sample = dataEvent.getData()
# use sample
with subscriber.subscribeAsync(
mal.ps.DataEventFilter.all(Example.Sample),
dataEventFunction) as subscription:
# Suspend main thread, dataEventFunction is
# called from another thread.
time.sleep(1)
# Note that subscription is destroyed once with block terminates.
# Asynchronous data subscription, no filtering,
# using lambda callback.
# DO NOT CALL read methods n the callback!
# 1 second sleep added to simulate some work.
with subscriber.subscribeAsync(mal.ps.DataEventFilter.all(),
lambda subscriber, dataEvent:
print(dataEvent.getData())) as subscription:
# Suspend main thread, data is printed from another thread.
time.sleep(1)
# Synchronous (aka event pump) subscription, no filtering.
# Poll with 1000 events / 1 seconds limit, endless loop.
with subscriber.subscribe(mal.ps.DataEventFilter.all(ExampleSample),
dataEventFunction) as subscription:
while True:
# dataEventFunction will be called synchronously
# when new event arrives.
subscriber.poll(maxEvents,
datetime.timedelta(seconds=1))
2.5.3. MrvSubscriber¶
MrvSubscriber is a convenience variant of a subscriber intended to be used by GUIs. It has only one method that always blocks for a given amount of time. This might be used to implement a refresh loop. Use zero time to read samples from a queue and do not block for any more time. The method always returns a list of most-recent values for each instance. It does not return only changed instances over specified amount. If instance becomes unavailable it is returned in the list anymore. This method creates new entities and copies their data; avoid using this method when dealing with large arrays.
This class is thread-safe.
2.5.3.1. Java¶
// install DDS MAL mapping
CiiFactory.installMal("dds");
try (CiiFactory factory = CiiFactory.getInstance()) {
URI uri = URI.create("dds.ps:///m1/CabinetTelemetry");
try (MrvSubscriber<Sample> mrvSubscriber =
factory.getMrvSubscriber(uri, QoS.DEFAULT,
null, Sample.class)) {
//
// Every 1 second get most-recent-values of alive instances.
//
while (true) {
List<Sample> mrvSamples = mrvSubscriber.readMostRecent(1, TimeUnit.SECONDS);
System.out.println(Arrays.toString(mrvSamples.toArray()));
}
}
}
2.5.3.2. C++¶
std::shared_ptr<::elt::mal::Mal> ddsMal = mal::loadMal("dds", {{"dds.domain", "100"}});
::elt::mal::CiiFactory &factory = mal::CiiFactory::getInstance();
factory.registerMal("dds", ddsMal);
mal::Uri uri("dds.ps:///m1/CabinetTelemetry");
try {
std::unique_ptr<ps::MrvSubscriber<mal::example::Sample>> mrvSubscriber =
factory.getMrvSubscriber<mal::example::Sample>(uri, mal::ps::qos::QoS::DEFAULT, {});
//
// Every 1 second get most-recent-values of alive instance
//
while (true) {
std::vector<std::shared_ptr<mal::example::Sample>> mrvSamples =
mrvSubscriber->readMostRecent(std::chrono::seconds(1));
for (const auto &sample : mrvSamples) {
std::cout << "Sample " << sample->getDaqId() << " / " <<
sample->getValue() << std::endl;
}
}
} catch (std::exception &exc) {
std::cerr << "EXCEPTION: " << exc.what() << std::endl;
}
2.5.3.3. Python¶
uri = 'dds.ps:///m1/CabinetTelemetry'
# Load DDS MAL mapping.
ddsMal = mal.loadMal('dds', {'dds.domain': '100'})
factory = mal.CiiFactory.getInstance()
factory.registerMal('dds', ddsMal)
with factory.getMrvSubscriber(uri,
Example.Sample,
qos=mal.ps.qos.DEFAULT) as mrvSubscriber:
# Every second get most-recent-value of alive instances.
timeout = datetime.timedelta(seconds=1)
while True:
mrvSamples = mrvSubscriber.readMostRecent(timeout)
if mrvSamples:
print(mrvSamples)
2.6. Request-response¶
Request-response is a coupled peer-to-peer client-server communication pattern. A server exposes to the network one or more instances of ICD-defined interfaces (services). A client connects to the server and invokes one or more methods on the services. MAL guarantees reliable QoS (i.e. request cannot be lost) and exactly-once delivery (i.e. for each method invocation exactly one request is made on the server-side; the request can neither be lost nor duplicated). MAL completely takes care of connection management. There is no need to explicitly issue a connect request, monitor connection status and do a reconnect. MAL takes care of this in the background. A programmer only needs to specify a timeout limit on the request. (Re-)connection time is included in the total execution time of the request). If connection cannot be (re-)established in that time or response is not delivered in time a TimeoutException is thrown. If response is delivered after the timeout expired the response is ignored. Any connection loss during the invocation of the method results in DisconnectedException, i.e. MAL does not try to invoke the method on the server side again after the reconnection.
In order to explicitly wait for connection to be established a connect method returns a future object that completes when the connection is established for the first time.
In cases where a programmer is interested in connection state it can register a connection listener on a client instance. The callbacks are called by MAL mapping or middleware thread.
When multiple responses are needed asynchronous method invocation (AMI) MAL support can be used. Basically, AMI is an asynchronous request with one or more asynchronous responses grouped as request. There is no limitation on number of responses nor time limit on lifetime. Lifetime is limited for the time of connection. Once connection is lost AMI request gets canceled (reported to the client as cancellation exception).
- NOTE: Since the Migration to FastDDS, Request-Response communication became
unavailable for MAL DDS C++, Java and Python languages.
2.6.1. Client¶
When an interface is defined in ICD, a synchronous and asynchronous client variants of the interfaces are generated. A programmer has a choice to select at client instantiation what interface to use. Invoking a method on synchronous interface will block and return its value via method return mechanism, or exception will be thrown. A request timeout can be set via ReplyTime QoS or via client instance created using timeout method. A TimeoutException is thrown in case of a timeout condition. Asynchronous methods always return a future object. A programmer then gets a response, or an exception, back via a future object. There can be more than one asynchronous requests issued/pending concurrently on one client instance.
Type of the interface on the client side does not affect (in any way) how a request is processed on the server side.
AMI methods can be recognized as methods that return Ami<returnType> object. The object is basically an iterator of future objects of returnType type. The future objects behave exactly the same as in case of asynchronous methods – a future object is a promise waiting for the next response. AMI call is completed when the iterator runs out of elements or when exception is being thrown (also thrown in case of disconnection).
The example code below illustrates the usage of a client interface and using all three types of methods.
2.6.1.1. Java¶
public class ClientExample {
public static void main(String[] args) {
// install ZPB MAL mapping
CiiFactory.installMal("zpb");
try (CiiFactory factory = CiiFactory.getInstance()) {
URI uri = URI.create("zpb.rr:///m1/Robot1");
//
// Synchronous client example w/ ReplyTime QoS set.
// RobotControlSync interface is requested.
//
try (RobotControlSync robot =
factory.getClient(uri,
new QoS[] {
new ReplyTime(3, TimeUnit.SECONDS)
},
null, RobotControlSync.class)) {
//
// Explicitly wait for connection to be established.
// (This is usually not needed.)
//
CompletableFuture<Void> connectionFuture = robot.asyncConnect();
connectionFuture.get(10, TimeUnit.SECONDS);
//
// Invoke synchronous method
// TimeoutException is throw if default timeout is exceeded.
//
robot.command(Command.START_COMMAND);
//
// Some methods throw user-defined exceptions.
// Handle them just like standard Java exceptions.
//
try {
float speed = robot.getSpeed();
List<Float> speedValues = new ArrayList<Float>();
speedValues.add(66.6f);
speedValues.add(41.0f);
speedValues.add(90.5f);
boolean checker = robot.validSpeedValues(speedValues);
if (checker) {
speed += 30.0f;
}
else {
speed += 3.0f;
}
robot.setSpeed(speed);
}
catch (TooFast tf) {
// handle too fast
}
//
// Create a new client instance w/ specific timeout.
// New instance is connected to the same service instance.
// Old instance is still valid.
//
RobotControlSync robotSlowCall = robot.timeout(1, TimeUnit.MINUTES);
robotSlowCall.command(Command.STOP_COMMAND);
//
// Multiple responses use-case using
// asynchronous method invocation (AMI).
//
try (Ami<String> systemCheckAmi = robot.systemCheck()) {
for (CompletableFuture<String> future : systemCheckAmi) {
String msg = future.get();
System.out.println(msg);
}
}
} catch (Throwable th) {
// handle exceptions ...
}
//
// Asynchronous client example w/ reply timeout set.
// RobotControlAsync interface is requested.
//
try (RobotControlAsync robot =
factory.getClient(uri,
new QoS[] {
new ReplyTime(3, TimeUnit.SECONDS)
},
null, RobotControlAsync.class)) {
//
// Basic case, invoke and then wait, if necessary
//
CompletableFuture<Void> cfuture = robot.command(Command.START_COMMAND);
// ... do some work here ...,
// and get response (Void in this case), future will block if
// the response is not yet received, or TimeoutException is throws in reply-time
// is exceeded
cfuture.get();
//
// Chained CompletableFuture<> example:
// async getSpeed() is called and on return setSpeed
// with new value requested, exceptions are handled.
// Wait for only 2 seconds to complete.
//
robot.getSpeed().thenAccept((Float speed) -> robot.setSpeed(speed + 10))
.exceptionally((th) -> {
/* handle exception */ return null;
}).get(2, TimeUnit.SECONDS);
//
// Multiple responses use-case using
// asynchronous method invocation (AMI).
// Handling is the same as for sync. client.
//
try (Ami<String> systemCheckAmi = robot.systemCheck()) {
for (CompletableFuture<String> future : systemCheckAmi) {
String msg = future.get();
System.out.println(msg);
}
}
} catch (Throwable th) {
// handle exceptions
}
}
}
}
2.6.1.2. C++¶
std::shared_ptr<::elt::mal::Mal> zpbMal = mal::loadMal("zpb", {{"zpb.domain", "100"}});
::elt::mal::CiiFactory &factory = mal::CiiFactory::getInstance();
factory.registerMal("zpb", zpbMal);
mal::Uri uri("zpb.rr:///m1/Robot1");
//
// Synchronous client example w/ ReplyTime QoS set.
// RobotControlSync interface is requested.
//
try {
std::unique_ptr<mal::example::RobotControlSync> robot = factory.getClient<mal::example::RobotControlSync>(
uri,
{std::make_shared<mal::rr::qos::ReplyTime>(std::chrono::seconds(3))},
{});
//
// Explicitly wait for connection to be established.
// (This is usually not needed.)
//
::elt::mal::future<void> connectionFuture = robot->asyncConnect();
std::future_status futureStatus =
connectionFuture.wait_for(::boost::chrono::seconds(10));
bool connected = (futureStatus == std::future_status::ready);
//
// Invoke synchronous methods
// TimeoutException is throw if default timeout is exceeded.
//
robot->command(mal::example::RobotControl::Command::START_COMMAND);
try {
float speed = robot->getSpeed();
mal::shared_vector<const float> speedArray = {66.6, 41.0, 90.5};
bool checker = robot->validSpeedValues(speedArray);
if(checker) {
speed += 30;
}
else {
speed += 3;
}
robot->setSpeed(speed);
} catch (mal::example::RobotControl::TooFast& tf) {
// handle too fast
}
//
// Multiple responses use-case using
// asynchronous method invocation (AMI).
//
std::shared_ptr<StrAmi> ami = robot->systemCheck();
for (auto future : *ami) {
std::cout << future.get() << std::endl;
}
//
// Create a new client instance w/ specific timeout.
//
robot->timeout(std::chrono::seconds(10))
->command(mal::example::RobotControl::Command::STOP_COMMAND);
} catch (std::exception& exc) {
std::cerr << "EXCEPTION: " << exc.what() << std::endl;
}
//
// Asynchronous client example w/ reply timeout set.
// RobotControlAsync interface is requested.
//
try {
std::unique_ptr<mal::example::RobotControlAsync> robot = factory.getClient<mal::example::RobotControlAsync>(
uri,
{std::make_shared<mal::rr::qos::ReplyTime>
(std::chrono::seconds(3))},
{});
//
// Basic case, invoke and then wait, if necessary
//
mal::future<void> cfuture = robot->command
(mal::example::RobotControl::Command::START_COMMAND);
// ... do some work here ...,
// and get response (void in this case), future will block if
// the response is not yet received, or TimeoutException is throws in
// reply-time is exceeded
cfuture.get();
// Chained future<> example:
// async getSpeed() is called and on return setSpeed
// with new value requested, exceptions are handled.
// Wait for 2 seconds to complete.
try {
mal::future<float> future = robot->
getSpeed().then([&](::mal::future<float> f)
{ return robot->setSpeed(f.get() + 0.1); }).get();
if (future.wait_for(boost::chrono::seconds(2)) !=
::boost::future_status::ready) {
// handle timeout
}
} catch (mal::example::RobotControl::TooFast& tf) {
// handle too fast
}
//
// Multiple responses use-case using
// asynchronous method invocation (AMI).
//
std::shared_ptr<StrAmi> ami = robot->systemCheck();
for (auto future : *ami) {
std::cout << future.get() << std::endl;
}
} catch (std::exception& exc) {
std::cerr << "EXCEPTION: " << exc.what() << std::endl;
}
2.6.1.3. Python¶
When using asynchronous client, result of the method call on the client reference is always a Future object. Actual result value must be obtained with get method on the result object.
To check state of the Future object, use its check method. Method takes no arguments and returns enumeration mal.rr.FutureStatus that can have the one of the following values:
DEFERRED – requested method was not started yet.
READY – requested method completed and result can be retrieved with call to get method.
TIMEOUT – wait was performed on the Future, but timeout occurred.
import elt.pymal as mal
# Import synchronous client implementation
from ModManualExamples.Elt.Mal.Example.RobotControl import RobotControlSync
# Import asynchronous client implementation
from ModManualExamples.Elt.Mal.Example.RobotControl import RobotControlAsync
from ModManualExamples.Elt.Mal.Example import Command, TooFast
# 3 seconds.
THREE_SECONDS = datetime.timedelta(seconds=3)
# Minute
MINUTE = datetime.timedelta(seconds=60)
def _main():
"""Main function implementation"""
# sync example
try:
# Load ZPB MAL mapping.
uri = 'zpb.rr:///m1/Robot1'
zpbMal = mal.loadMal('zpb', {'zpb.domain': '100'})
factory = mal.CiiFactory.getInstance()
factory.registerMal('zpb', zpbMal)
# Synchronous client example w/ ReplyTime QoS timeout set.
# RobotControlSync interface is requested.
with factory.getClient(uri,
RobotControlSync,
qos=mal.rr.qos.ReplyTime(THREE_SECONDS)) as robot:
# Explicitly wait for connection to be established.
# (This is usually not needed).
connectionFuture = robot.asyncConnect()
connectionFuture.wait_for(THREE_SECONDS)
# Invoke synchronous methods
# TimeoutError is thrown if default timeout is exceeded.
robot.command(Command.START_COMMAND)
try:
speed = robot.getSpeed()
speedArray = mal.makeSharedVector('f', 3, 90.5)
speedArray = speedArray.freeze()
checker = robot.validSpeedValues(speedArray)
if checker:
speed += 30
else:
speed += 3
robot.setSpeed(speed)
except TooFast as e:
# Handle too fast
pass
# Multiple responses use-case using
# asynchronous method invocation (AMI).
systemCheckAmi = robot.systemCheck()
# We are not using context manager protocol here.
# Once we are done with the AMI, it should be
# closed().
try:
# Result set is iterable.
for msg in systemCheckAmi:
print(msg)
finally:
systemCheckAmi.close()
# Asynchronous example w/reply timeout set.
# RobotControlAync interface is requested.
with factory.getClient(uri,
RobotControlAsync,
qos=mal.rr.qos.ReplyTime(THREE_SECONDS)) as robot:
# Basic case, invoke and then wait, if necessary
future = robot.command(Command.START_COMMAND)
# ... do some work here ...
# and get response (None in this case). Future will block
# it the response is not yet received or TimeoutError is
# thrown when reply time is exceeded.
future.get()
# Multiple responses use-case using
# asynchronous method invocation (AMI).
# Handling is the same as for sync. client.
with robot.systemCheck() as systemCheckAmi:
for future in systemCheckAmi:
print('System Check: ', future.get())
except Exception as e:
# Handle exceptions or
raise
2.6.2. Server¶
Creating a server instance just creates a container where services can be registered. A service is a uniquely named instance reachable over the network. There can be multiple services of the same or different interface type registered to the server. Same service instance can also be registered multiple times using different names. The server will just propagate the request to a service by its name. Once a service is unregistered it is no longer reachable over the network. Any pending requests on unregistered service will complete normally.
When an interface is defined in ICD, a synchronous and asynchronous service variants of the interfaces are generated. A programmer needs to implement one of these interfaces. Only these interfaces can be registered as a service. An interface type dictates how requests are processed on the server side. Server processes requests of all the services using one thread. Once a request is received it decodes the request and calls a method on specified service. If the service implements a synchronous interface then invocation of its method will block. This includes processing of other requests of the same and other services. In case of asynchronous service the method will block just for a method to return a future object. The server will continue to process other pending requests. The server will send the response of asynchronous services back to the client immediately when the return value, or an exception, is given to the future object. This implies that asynchronous service responses might come back in a different order than their requests were issued.
When a C++ MAL service throws a known exception (i.e. declared in the MAL-ICD), it it is passed through network to the client, where it is re-raised. However, if the service throws other exceptions: a MAL exception of a type that is not declared for this particular method, a standard exception, or throws a C++ object that is not derived from the std::exception, then MAL will behave as shown in Table 1.
ICD |
Exc. type |
Server behaviour |
Client behaviour |
---|---|---|---|
throws |
MAL excepti ion but other than declared |
catch and propaga te UnhandledExcep tion |
receive UnhandledRemoteExcep tion |
throws |
non-MAL C++ exception, e.g. std::ou t_of_range |
terminate |
MalException: Client disconnected |
throws |
non-MAL, non C++ exception (std:string) |
terminate |
MalException: Client disconnected |
no throws |
MalException although none was declared |
catch and propaga te UnhandledExcep tion |
receive UnhandledRemoteExcep tion |
no throws |
non-MAL, C++ exception, e.g. std::ou t_of_range |
terminate |
MalException: Client disconnected |
no throws |
non-MAL, non C++ exception (std:string) |
terminate |
MalException: Client disconnected |
Table 1 MAL behaviour when an interface method throws an undeclared exception
Implementation of server-side AMI methods is the same for synchronous and asynchronous service interfaces. AMI methods declarations return the same Ami<returnType> as on the client side. However, the service needs to provide the implementation of the Ami<returnType>. The creation of it is done via ServerContextProvider singleton instance implemented as thread-safe-storage instance that provides access to server-side helper methods. A createAmi method will create a ServerAmi<returnType> instance (that implements Ami<returnType> interface). The implementation of the AMI method just needs to setup asynchronous execution of the method (the method should not block!) and return the ServerAmi<returnType> instance. Sending responses to the client is done by calling one of the three methods on the ServerAmi<returnType> instance: complete (non-last response), completeExceptionally (exception response, last response) and completed (last response). Once last response is sent the instance gets automatically released.
See the examples below on how to implement synchronous and asynchronous service.
The server has no responsibility over the lifecycle of registered services.
2.6.2.1. Java¶
public class ServerExample {
private static void simulateWork() {
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
// noop
}
}
/**
* Synchronous RobotControl implementation example,
* must implement <ICD interface name> interface.
*/
static class RobotControlImpl implements RobotControl {
private final float maxSpeed;
private List<Float> allSpeeds;
private List<Float> tempSpeeds;
public RobotControlImpl(float maxSpeed, List<Float> speeds) {
this.maxSpeed = maxSpeed;
this.tempSpeeds = speeds;
this.allSpeeds = new ArrayList<Float>();
this.allSpeeds.add(0.0f);
}
@Override
public void command(Command com) {
switch (com) {
case START_COMMAND:
this.allSpeeds = new ArrayList<Float>(this.tempSpeeds);
break;
case STOP_COMMAND:
this.allSpeeds = new ArrayList<Float>();
this.allSpeeds.add(0.0f);
break;
default:
// undefined exceptions, i.e. exception not defined in ICD,
// translate to UnhandledRemoteException on the remote side
throw new IllegalArgumentException();
}
}
@Override
public float setSpeed(float speed) throws TooFast {
if (speed > this.maxSpeed) {
throw new TooFast();
}
this.allSpeeds.add(speed);
return this.allSpeeds.get(this.allSpeeds.size() - 1);
}
@Override
public float getSpeed() {
return this.allSpeeds.get(this.allSpeeds.size() - 1);
}
@Override
public boolean validSpeedValues(List<Float> values) {
boolean valid = true;
for (float val : values) {
if (val < 0 || val > this.maxSpeed) {
valid = false;
break;
}
}
return valid;
}
@Override
public Ami<String> systemCheck() {
// request for ServerAmi instance
// to be returned by this method
ServerAmi<String> ami = ServerContextProvider.getInstance().createAmi(String.class);
// setup asynchronous execution
Executors.newCachedThreadPool().execute(() -> {
simulateWork();
// first response
ami.complete("Battery OK");
simulateWork();
// second response
ami.complete("Drivetrain OK");
simulateWork();
// final response
// NOTE: completed vs complete method name
ami.completed("Engine OK");
});
// return AMI instance
return ami;
}
}
/**
* Asynchronous RobotControl implementation example,
* must implement Async<ICD interface name>.
*/
static class AsyncRobotControlImpl implements AsyncRobotControl {
private final float maxSpeed;
private List<Float> allSpeeds;
private List<Float> tempSpeeds;
public AsyncRobotControlImpl(float maxSpeed, List<Float> speeds) {
this.maxSpeed = maxSpeed;
this.tempSpeeds = speeds;
this.allSpeeds = new ArrayList<Float>();
this.allSpeeds.add(0.0f);
}
@Override
public CompletableFuture<Void> command(Command com) {
// create future
CompletableFuture<Void> future = new CompletableFuture<>();
// setup asynchronous execution
Executors.newCachedThreadPool().execute(() -> {
switch (com) {
case START_COMMAND:
this.allSpeeds = new ArrayList<Float>(this.tempSpeeds);
// notify the completion w/ result
future.complete(null);
break;
case STOP_COMMAND:
this.allSpeeds = new ArrayList<Float>();
this.allSpeeds.add(0.0f);
// notify the completion w/ result
future.complete(null);
break;
default:
// undefined exceptions, i.e. exception not defined in ICD,
// translate to UnhandledRemoteException on the remote side
// notify the completion w/ exception
future.completeExceptionally(new IllegalArgumentException());
}
});
// return future
return future;
}
@Override
public CompletableFuture<Float> setSpeed(float speed) {
// create future
CompletableFuture<Float> future = new CompletableFuture<>();
// setup asynchronous execution
Executors.newCachedThreadPool().execute(() -> {
if (speed > this.maxSpeed) {
// notify the completion w/ exception
future.completeExceptionally(new TooFast());
}
this.allSpeeds.add(speed);
// notify the completion w/ result
future.complete(speed);
});
// return future
return future;
}
@Override
public CompletableFuture<Float> getSpeed() {
// if the execution of the method is short
// an already completed future can be returned
return CompletableFuture.completedFuture(this.allSpeeds.get(this.allSpeeds.size() - 1));
}
@Override
public CompletableFuture<boolean> validSpeedValues(List<Float> values) {
// create future
CompletableFuture<Float> future = new CompletableFuture<>();
// setup asynchronous execution
Executors.newCachedThreadPool().execute(() -> {
boolean valid = true;
for (float val : values) {
if (val < 0 || val > this.maxSpeed) {
valid = false;
break;
}
}
// notify the completion w/ result
future.complete(valid);
}
// return future
return future;
}
@Override
public Ami<String> systemCheck() {
// request for ServerAmi instance
// to be returned by this method
ServerAmi<String> ami = ServerContextProvider.getInstance().createAmi(String.class);
// setup asynchronous execution
Executors.newCachedThreadPool().execute(() -> {
simulateWork();
// first response
ami.complete("Battery OK");
simulateWork();
// second response
ami.complete("Drivetrain OK");
simulateWork();
// final response
// NOTE: completed vs complete method name
ami.completed("Engine OK");
});
// return AMI instance
return ami;
}
}
/**
* Main entry-point.
*
* @param args command-line arguments.
*/
public static void main(String[] args) {
URI uri = URI.create("zpb.rr:///m1");
// install ZPB MAL mapping
CiiFactory.installMal("zpb");
try (CiiFactory factory = CiiFactory.getInstance()) {
//
// Create server, default QoS.
// Currently there are no MAL QoS relevant for the Server.
//
try (Server server = factory.createServer(uri, QoS.DEFAULT, null)) {
List<Float> speedsArray = new ArrayList<Float>();
speedsArray.add(0.1f);
speedsArray.add(49.4f);
speedsArray.add(74.4f);
//
// Register three Robot services.
// They are all separate separate instances.
// First two instances have synchronous implementation,
// the third has asynchronous.
server.registerService("Robot1", new RobotControlImpl(120.0f, speedsArray));
server.registerService("Robot2", new RobotControlImpl(80.0f, speedsArray));
server.registerService("Robot3", new AsyncRobotControlImpl(20.0f, speedsArray));
// Process commands until server.shutdown() is called.
server.run();
}
}
}
}
2.6.2.2. C++¶
namespace mal = ::elt::mal;
/**
* Simulate some work
*/
static void simulateWork() {
std::this_thread::sleep_for(std::chrono::seconds(2));
}
/**
* Synchronous RobotControl implementation example,
* must implement RobotControl interface.
*/
class RobotControlImpl: public virtual mal::example::RobotControl {
public:
RobotControlImpl(float max, mal::shared_vector<const float> speeds)
: maxSpeed(max), tempSpeeds(speeds) {
allSpeeds = {0.0};
}
void command(RobotControl::Command command) override {
switch (command) {
case RobotControl::Command::START_COMMAND:
allSpeeds = tempSpeeds;
break;
case RobotControl::Command::STOP_COMMAND:
allSpeeds = {0.0};
break;
default:
throw std::invalid_argument("RobotControlImpl: Unknown command");
}
}
float setSpeed(float newSpeed) override {
if (speed > maxSpeed) {
throw RobotControl::TooFast(
"RobotControlImpl::setSpeed, speed too high");
}
::elt::mal::shared_vector<float> mutableSpeeds = thaw(allSpeeds);
mutableSpeeds.push_back(newSpeed);
allSpeeds = freeze(mutableSpeeds);
return allSpeeds.back()
}
float getSpeed() const override { return allSpeeds.back(); }
bool validSpeedValues(const ::elt::mal::shared_vector<const float>& values) override {
bool valid = true;
for (const auto& val: values) {
if(val < 0 || val > maxSpeed) {
valid = false;
break;
}
}
return valid;
}
std::shared_ptr<RobotControl::StrAmi> systemCheck() override {
// Obtain ptr to ServerAmi
std::shared_ptr<RobotControl::StrAmi> ami = mal::rr::ServerContextProvider
<mal::example::ServerContextImpl<std::string>>::
getInstance().createAmi();
auto future = boost::async(boost::launch::async, [=]() {
simulateWork();
ami->complete("Battery OK");
simulateWork();
ami->complete("Drivertrain OK");
simulateWork();
ami->completed("Engine OK");
});
// Return ptr to Ami
return ami;
}
private:
float maxSpeed;
mal::shared_vector<const float> allSpeeds;
mal::shared_vector<const float> tempSpeeds;
};
/**
* Asynchronous RobotControl implementation example,
* must implement AsyncRobotControl interface.
*/
class AsyncRobotControlImpl: public virtual mal::example::AsyncRobotControl {
public:
AsyncRobotControlImpl(float max, mal::shared_vector<const float> speeds)
: maxSpeed(max), tempSpeeds(speeds) {
allSpeeds = {0.0}
}
mal::future<void> command(mal::example::RobotControl::Command command)
override {
std::function<void()> f = [this, command]() -> void {
std::lock_guard<std::mutex> guard(speedMutex);
switch (command) {
case mal::example::RobotControl::Command::START_COMMAND:
allSpeeds = tempSpeeds;
break;
case mal::example::RobotControl::Command::STOP_COMMAND:
allSpeeds = {0.0};
break;
default:
throw std::invalid_argument("RobotControlImpl: Unknown command");
}
};
return boost::async(boost::launch::async, f);
}
mal::future<float> setSpeed(float newSpeed) override {
return boost::async(boost::launch::async, [this, newSpeed]() -> float {
std::lock_guard<std::mutex> guard(speedMutex);
if (newSpeed > maxSpeed) {
throw mal::example::RobotControl::TooFast(
"RobotControlImpl::setSpeed, speed too high");
}
::elt::mal::shared_vector<float> mutableSpeeds = thaw(allSpeeds);
mutableSpeeds.push_back(newSpeed);
allSpeeds = freeze(mutableSpeeds);
return allSpeeds.back()
});
}
mal::future<float> getSpeed() const override {
boost::promise<float> promise;
promise.set_value(allSpeeds.back());
return promise.get_future();
}
mal::future<float> validSpeedValues(const ::elt::mal::shared_vector<const float>& values) override {
boost::promise<bool> promise;
bool valid = true;
for (const auto& val: values) {
if(val < 0 || val > maxSpeed) {
valid = false;
break;
}
}
promise.set_value(valid);
return promise.get_future();
}
std::shared_ptr<AsyncRobotControl::StrAmi> systemCheck() override {
// Obtain ptr to ServerAmi
std::shared_ptr<AsyncRobotControl::StrAmi> ami = mal::rr::ServerContextProvider
<mal::example::ServerContextImpl<std::string>>::
getInstance().createAmi();
auto future = boost::async(::boost::launch::async, [=]() {
simulateWork();
ami->complete("Battery OK");
simulateWork();
ami->complete("Drivertrain OK");
simulateWork();
ami->completed("Engine OK");
});
// Return ptr to Ami
return ami;
}
private:
float maxSpeed;
std::mutex speedMutex;
mal::shared_vector<const float> allSpeeds;
mal::shared_vector<const float> tempSpeeds;
};
/**
* Server example.
*
* Main entry point.
* @param argc number of command line arguments
* @param argv command line arguments
*/
int main(int argc, char** argv) {
// Load ZPB MAL mapping
std::shared_ptr<::elt::mal::Mal> zpbMal = mal::loadMal("zpb", {{"zpb.domain", "100"}});
// Register mapping with CiiFactory
::elt:: &factory = mal::CiiFactory::getInstance();
factory.registerMal("zpb", zpbMal);
mal::Uri uri("zpb.rr:///m1");
try {
//
// Create server, default QoS.
// Currently there are not MAL QoS relevant for the Server
//
std::unique_ptr<rr::Server> server = factory.createServer(uri, mal::rr::qos::QoS::DEFAULT, {});
mal::shared_vector<const float> speedsArray = {0.1, 49.4, 74.4};
// Register three Robot services.
// They are all separate instances.
// First two instance have synchronous implementation,
// third has asynchronous.
std::shared_ptr<mal::rr::RrEntity> robot1(new RobotControlImpl(120.0, speedsArray));
std::shared_ptr<mal::rr::RrEntity> robot2(new RobotControlImpl(80.0, speedsArray));
std::shared_ptr<mal::rr::RrEntity> robot3(new AsyncRobotControlImpl(80.0, speedsArray));
server->registerService<mal::example::RobotControl, true>
("Robot1", robot1);
server->registerService<mal::example::RobotControl, true>
("Robot2", robot2);
server->registerService<mal::example::RobotControl, false>
("Robot3", robot3);
// Process commands until server->shutdown() is called.
server->run();
} catch (std::exception& exc) {
std::cerr << "EXCEPTION: " << exc.what() << std::endl;
}
return 0;
}
2.6.2.3. Python¶
import datetime
import time
import threading
from concurrent.futures import ThreadPoolExecutor
# import top level Python MAL API module
import elt.pymal as mal
# import RobotControl binding
from ModManualExamples.Elt.Mal.Example import RobotControl
# import Command enumeration and TooFast exception
from ModManualExamples.Elt.Mal.Example import Command, TooFast
def simulateWork():
time.sleep(1.0)
class RobotControlSyncImpl:
"""Synchronous RobotControl implementation example, must
implement methods defined in the ICD definition file:
command, setSpeed, getSpeed, systemCheck
"""
def __init__(self, maxSpeed, speeds):
"""constructor"""
self._allSpeeds = [0.0]
self._maxSpeed = maxSpeed
self._tempSpeeds = speeds
def command(self, com):
"""Handle command"""
if com == Command.START_COMMAND:
self._allSpeeds = self._tempSpeeds
elif com == Command.END_COMMAND:
self._allSpeeds = [0.0]
else:
raise ValueError('Invalid command value')
def setSpeed(self, speed):
"""Set new speed.
Throws TooFast when speed exceeds maximum speed.
"""
if speed > self._maxSpeed:
raise TooFast()
self._allSpeeds.append(speed)
return self._allSpeeds[-1]
def getSpeed(self):
"""Return current speed"""
return self._allSpeeds[-1]
def validSpeedValues(self, values):
"""Return Validation of entry Values"""
result = True
for val in values:
if val < 0 or val > self._maxSpeed:
result = False
break
return result
def systemCheck(self):
"""Perform system check"""
# Obtain instance to the server context.
# This call uses ServerContextProvider.getInstance()
# method internally.
context = RobotControl.ServerContextString.getInstance()
# Obtain server AMI
serverAmi = context.createAmi()
# Setup asynchronous execution
systemCheckThread = threading.Thread(self._systemCheck,
args=(serverAmi,))
systemCheckThread.start()
return serverAmi
def _systemCheck(self, ami):
"""Actual systemCheck implementation"""
simulateWork()
# first response
ami.complete('Battery OK')
simulateWork()
# second response
ami.complete("Drivetrain OK")
simulateWork()
# final response, use completed method
ami.completed("Engine OK")
class RobotControlAsyncImpl:
"""Asynchronous RobotControl implementation example
all methods that don't return Ami, must return
concurrent.futures.Future object
"""
def __init__(self, maxSpeed, speeds):
"""constructor"""
self._executor = ThreadPoolExecutor(max_workers=4)
self._allSpeeds = [0.0]
self._maxSpeed = maxSpeed
self._tempSpeeds = speeds
def command(self, com):
"""Handle command"""
return self._executor.submit(self, _command, com)
def _command(self, com):
"""Command implementation"""
if com == Command.START_COMMAND:
self._allSpeeds = self._tempSpeeds
elif com == Command.END_COMMAND:
self._allSpeeds = [0.0]
else:
raise ValueError('Invalid command value')
def setSpeed(self, speed):
"""Set speed"""
self._executor.submit(self._setSpeed, speed)
def _setSpeed(self, speed):
"""setSpeed implementation
Throws TooFast when speed exceeds maximum speed.
"""
if speed > self._maxSpeed:
raise TooFast()
self._allSpeeds.append(speed)
return self._allSpeeds[-1]
def getSpeed(self):
"""Get speed"""
return self._executor.submit(lambda x: x, self._allSpeeds[-1])
def validSpeedValues(self, values):
"""Check if values are inside the limitations"""
return self._executor.submit(self._validSpeedValues, values)
def _validSpeedValues(self, values):
"""validSpeedValues implementation
Sanity check for list of values.
"""
result = True
for val in values:
if val < 0 or val > self._maxSpeed:
result = False
break
return result
def systemCheck(self):
"""Perform system check"""
# Obtain instance to the server context.
# This call uses ServerContextProvider.getInstance()
# method internally.
context = RobotControl.ServerContextString.getInstance()
# Obtain server AMI
serverAmi = context.createAmi()
# Setup asynchronous execution
systemCheckThread = threading.Thread(self._systemCheck,
args=(serverAmi,))
systemCheckThread.start()
return serverAmi
def _systemCheck(self, ami):
"""Actual systemCheck implementation"""
simulateWork()
# first response
ami.complete('Battery OK')
simulateWork()
# second response
ami.complete("Drivetrain OK")
simulateWork()
# final response, use completed method
ami.completed("Engine OK")
def main():
uri = 'zpb.rr:///m1'
# Load ZPB MAL mapping
zpbMal = mal.loadMal('zpb')
factory = CiiFactory.getInstance()
factory.registerMal('zpb', zpbMal)
with factory.createServer(uri, mal.rr.qos.DEFAULT, {}) as server:
speedsArray = [0.1, 49.4, 74.4]
# Register three Robot services.
# They are all separate instances.
# First two have synchronous implementation,
# the third has asynchronous implementation.
server.registerService('Robot1',
RobotControl.RobotControlSyncService,
RobotControlSyncImpl(120.0, speedsArray))
server.registerService('Robot2',
RobotControl.RobotControlSyncService,
RobotControlSyncImpl(80.0, speedsArray))
server.registerService('Robot3',
RobotControl.RobotControlAsyncService,
RobotControlAsyncImpl(20.0, speedsArray))
server.run()
2.7. Appendix¶
2.7.1. Building CII application with WAF¶
A CII application must be built using waf/wtools that has ICD generator (icd-gen tool) and all other supported middleware generators are integrated. All the artifacts are built into property formed libraries that are needed to be dynamically loaded by MAL mappings. Steps to build a CII application is the following:
Create a project directory with a project wscript file, e.g.:
from wtools.project import declare_project
wtools.project.declare_project('icd-demo-folder', '0.1-dev-version', # Project directory name and version
recurse='icd-app1 icd-app2', # Name of all applications folders
requires='cxx boost java python protoc cii', # Primary libraries to be linked
boost_libs='log log_setup thread system') # Secondary libraries to be linked
# ...
Project directory will contain as many applications necessary with the correspondent ICD XML files. Following the above example code 2 application will be implemented. So, after creating the applications directories:
Inside bought application directory, 2 subdirectories should be created: ‘icd’ (contains ICD files for appliciations) and ‘app’ (where the application implementation is located). Names of those two subdirectories can be different.
Parallel to this folders a wscript must be added with the following sctruture:
from wtools import package
# Declare external libraries to be used specifically by the respective application
def configure(conf):
"""System installed libraries"""
conf.check_cfg(package='opentracing_api', uselib_store='OPENTRACE', args='--cflags --libs')
"""Created WAF libraries based on other applications"""
conf.check_wdep(wdep_name='cii.mal-common', uselib_store='MALCOMMON')
conf.check_wdep(wdep_name="client-api.config", uselib_store="CLIENTCONF")
package.declare_package(recurse='app icd') # Folders inside application directory
Jumping in too the declared packeges (‘app’ and ‘icd’), each one must have a wscript file. Starting with the ‘app’ subdirectories, is worth noting that depending on the developer language the WAF method call to create the correspondent executable file will change (‘declare_cprogram’ ‘declare_pyprogram’ ‘declare_jar’):
from wtools.module import declare_cprogram
"""From pre-checked libraries in previous folder 'wscript', call the ones needed for the respective application"""
declare_cprogram(target='executable-app-language', use='BOOST CLIENTCONF MALCOMMON OPENTRACE')
And the ‘icd’ subdirectories with a wscript like:
from wtools.module import declare_malicd
declare_malicd() # Enables ICD artifacts generation
Remember that when building the project, the WAF Tool will only use files that exist inside a ‘src’ folder. So all necessary coding for the application to work as entended must be inside a source directory. In the end a developer should have a structure like this:
root
`-- icd-demo-folder
|-- icd-app1
| |-- app
| | |-- src
| | | `-- main-app1-file.xxx
| | `-- wscript
| |-- icd
| | |-- src
| | | `-- icd-app1-file.xml
| | `-- wscript
| `-- wscript
|
|-- icd-app2
| |-- app
| | |-- src
| | | `-- main-app2-file.xxx
| | `-- wscript
| |-- icd
| | |-- src
| | | `-- icd-app2-file.xml
| | `-- wscript
| `-- wscript
|
`-- wscript
See icd-demo project as example.