2. MAL API

Revision:

2.3

Status:

Released

Repository:

https://gitlab.eso.org/cii/srv/cii-srv

Project:

ELT CII

Folder:

/trunk/deliverables/phase2

Document ID:

CSL-MAN-17-150198

File:

MAN-MAL_API.docx

Owner:

Matej Šekoranja, Cosylab Sweden

Last modification:

April 21, 2022

Created:

October 25, 2017

Prepared by

Reviewed by

Approved by

Matej Šekoranja, CSL SWE

Aljaž Podboršek

Gregor Čuk

Gregor Čuk

Document History

Revision

Date

Changed/rev iewed

Section(s)

Modificatio n

0.1

2017-10-25

msekoranja

All

Created.

1.0

2017-11-06

msekoranja

All

Update on API, release.

1.1

2018-02-23

msekoranja

3.1, 3.3

Update on API changes, C++ MrvSubscrib er example.

2.0

2019-02-22

msekoranja

All

Complete rewrite of a document.

2.1

2019-03-04

msekoranja

All

Example code revised.

2.2

2022-03-31

jpribosek

2.5.2

Example code revised.

2.3

2022-04-21

dkumar

2.6.2

Exception handling details

Confidentiality

This document is classified as a confidential document. As such, it or parts thereof must not be made accessible to anyone not listed in the Audience section, neither in electronic nor in any other form.

Scope

This document is a Middleware Abstraction Layer user manual document for the ELT Core Integration Infrastructure Software project.

Audience

Users and Maintainers of the ELT Core Integration Infrastructure Software.

Glossary of Terms

API

Application Programming Interface

MAL

Middleware Abstraction Layer

QoS

Quality of Service

CII

Core Integration Infrastructure

ICD

Interface Control Document

XML

Extensible Markup Language

References

  1. Cosylab, Middleware Abstraction Layer Design document, CSL-DOC-17-147260 v1.5

  2. Cosylab, ELT CII MAL Transfer document, CSL-DOC-18-168015, version 1.8

  3. Cosylab, Data Addressing Specification, CSL-DOC-17-147264, version 1.2

2.1. Overview

This document is a user manual for MAL API. The document provides descriptions of the API and instructions to programmers how to use MAL API. All the details on the API and its functionality are covered by the MAL design document [1].

2.2. Installation

See ELT CII MAL Transfer document [2], Chapter 1.2 on how to build and install MAL.

2.3. Introduction

MAL is an abstraction API that provides complete independence from the communication middleware software. This is achieved by dynamically loadable MAL middleware implementation (mapping) that implements MAL API. In addition, data entities (classes, structures, interfaces) need to be independent from the communication-middleware too. This is achieved by using generation: user defines its data entities in ICD definition language (i.e. XML), the generator then produces middleware-agnostic entities that are used by the applications and middleware-specific code that needs to be generated. The specific code is dynamically loaded by the MAL mapping implementations when needed. This part is completely hidden from the programmer. The programmer must never use or reference any middle-ware specific code it only uses pure MAL API and agnostic data entities.

Middleware selection is done using Uniform Resource Identifier (URI) addressing. A URI is a string of characters that unambiguously identifies a particular resource. The scheme part of the URI (e.g. “dds.ps://” prefix) determines what MAL mapping to use, whereas the rest of the URI is completely middleware specific. For more info on the URIs please refer to the Data Addressing Specification ([3]).

MAL API supports two standardized communication patterns: data centric publish-subscribe (decoupled peers based on data publishers and subscribers) and request-response (coupled peer-to-peer client-server communication).

The following sections provide the description and examples of the API for both communication patterns. The examples demonstrate usage of the API, they do not do anything meaningful. Source code of the examples is available in elt-mal/mal module.

2.4. CiiFactory

CiiFactory is an entry-point class of the MAL API. Usually it is accessed as a singleton instance via getInstance() method. This provided a shared, thread-safe instance to be used among the threads in one process. However, in case of a more complex application where an insulation among different “sub-processes” is required, the API provides a createInstance() method that instantiates a completely new CiiFactory instance on every call of this method. Avoid using this method unless there is a good reason for it.

A CiiFactory instance has 2 groups of methods: methods for MAL mapping implementation (un-)registration and factory methods of entities (publisher, subscriber; client, server) for both communication patterns. The main task of CiiFactory is to delegate factory method calls to the appropriate registered MAL mapping implementation depending on the URI, given as a parameter to all the factory methods. CiiFactory is instantiated without any MAL mapping registered. A programmer needs to register MAL mappings. The registration is done by calling registerMal method that takes 2 parameters: a reference to the instance of MAL mapping implementation and an URI schema string to be handled by the implementation. CiiFactory takes no responsibilities on registered mappings lifecycle; it is responsibility of the programmer to take care of it, still CiiFactory provides a close() method that calls close() on all registered MAL mappings and unregisters them. A programmer has also an ability to unregister previously registered MAL mappings. Un-registration only removes a MAL instance from CiiFactory dictionary; any previously created entities (e.g. publishers) will remain active. The entities lifecycle is responsibility of the MAL mapping implementation, meaning releasing a MAL mapping implementation also releases all its entities.

Registration of a MAL mapping introduces a compile and runtime dependency to the mapping. CiiFactory.loadMal method dynamically loads the mapping and avoids any dependency on any MAL mapping. CiiFactory.installMal calls CiiFactory.loadMal and registers newly loaded mapping to the shared CiiFactory instance. Optional properties parameter can be passed as parameter to configure loaded MAL. Both methods accept MAL mapping (schema) name, e.g.: dds, zpb, opcua.

All factory methods accept 3 common parameters: a URI, a quality-of-service (QoS) parameters and a string name-value list of MAL mapping specific properties. MAL API defines a common set of QoS parameters per communication pattern (described in detail in the following sections). If none (null) are supplied defaults are used.

The list of MAL mapping specific properties allows specific/fine tuning of the middleware in a uniform way. The properties in a list are specific and documented per MAL mapping implementation, usually they are provided via some kind of configuration subsystem. If none (null) are supplied defaults are used.

A registration of another MAL instance with the same schema string will override any previous registration (un-registration no longer needed). Un-registration of non-registered scheme results in no operation action. A call to a factory method with a URI scheme that is not registered will result in SchemeNotSupportedException exception.

The following code examples show recommended initialization of MAL mapping with creation of a publisher.

2.4.1. Java

In Java it is highly recommended to use try-with-resources statements so that the code is safe and close() on the resource will be always called.

// install DDS MAL mapping
CiiFactory.installMal("dds");

// use try-with-resources statement to be safe that close()
// is always being called on factory and MAL closed
try (CiiFactory factory = CiiFactory.getInstance()) {

  // create URI
  URI uri = URI.create("dds.ps:///m1/CabinetTelemetry");

  // create publisher instance with default QoS and no specific properties
  // use try-with-resources statement to be safe that close()
  // is always being called on publisher
  try (Publisher<Sample> publisher =
      factory.getPublisher(uri, QoS.DEFAULT, null, Sample.class)) {
    // put publish code here ...
  }

}

2.4.2. C++

C++ implementation uses RAII idiom using smart pointers. Resources are closed automatically when their enclosing scope ends. To close resource explicitly call close() method.

try {

  // load DDS MAL mapping with (optional) properties
  std::shared_ptr<::elt::mal::Mal> ddsMal = mal::loadMal("dds", mal::Mal::Properties{
    {"dds.domain", "100"}});
  ::elt::mal::CiiFactory &factory = mal::CiiFactory::getInstance();

  // register DDS MAL mapping with CiiFactory
  factory.registerMal("dds", ddsMal);

  // create URI
  mal::Uri uri("dds.ps:///m1/CabinetTelemetry");

  // create publisher instance with default QoS and no specific properties
  std::unique_ptr<ps::Publisher<mal::example::Sample>> publisher = factory.getPublisher<mal::example::Sample>(
     uri,
     mal::ps::qos::QoS::DEFAULT,
     {});
  // put publisher code here...

} catch (std::exception& exc) {
  std::cerr << "EXCEPTION: " << exc.what() << std::endl;
}

2.4.3. Python

In Python all generated entities are prefixed with a “Mod”<name of the ICD file in camelCase> string. The module also loads appropriate C++ binding classes and libraries. Submodule structure follows ICD package structure with capitalized name for each package.

Python MAL API supports context manager protocol to automatically release the resources after use. Closing resources explicitly with close() is also supported.

# Import MAL API.
import elt.pymal as mal

# Import Data Entity class from binding module.
from ModManualExamples.Elt.Mal import Example

uri = 'dds.ps:///m1/CabinetTelemetry'
factory = CiiFactory.getInstance()

# Load DDS MAL mapping.
ddsMal = mal.loadMal("dds", {'dds.domain': '100'})
factory.registerMal('dds', ddsMal)

# Use context manager interface of the publisher
# to ensure underlying instance of the publisher is
# closed when context is destroyed.
# Create publisher instance with default QoS and no
# specific properties.
with factory.getPublisher(uri,
                          Example.Sample,
                          qos=mal.ps.qos.DEFAULT) as publisher:
    # Put publish code here...

# Publisher is automatically closed at this point.

2.5. Publish-subscribe

Publish-subscribe is a data-centric decoupled communication pattern. Publishers (data producers) publish data to a topic, and subscribers (data consumers) subscribe to the topics they are interested in to receive the data published. This is not a coupled (connected) peer-to-peer communication – there can be many subscribers per one topic. MAL prescribes a limit of only one publisher per topic. Order of instantiation/startup also does not matter. Subscriber will miss any data sent to the channel prior their subscription. Subscribers can subscribe to a topic even if there is no publisher available. Once it became available the subscribers will start receiving the newly published data. The paradigm does not guarantee that the first n-messages are will be received by the subscribers, even if the subscribers are already running when the publisher starts. It takes some time to establish communication mechanism on startup of the publishers and subscribers leading to non-instant subscriptions to the topics and consequently missed data. In addition, MAL does not guarantee reliable communication (and queueing) – the data might be lost during the transmission or dropped if send/receive queues are full. MAL mapping specific properties can be used to set reliability QoS, if underlying communication middleware supports it.

MAL however does support 2 QoS settings:

  • Deadline

The deadline QoS shall be the maximum age of a sample, i.e. t(now) – t(sample) < deadline_time, sample time is a time of creation of a sample. The sample time is given as parameter to the publish method call on publisher. If not given, current time is used. Note that this also implies that if a sample is left too long in the subscribers receive queue it might expire (and is removed from the queue).

If deadline QoS is not defined, it is assumed to be infinite.

  • Latency

The latency QoS shall be the maximum time a sample may remain in-transit between the publisher and subscriber. That is, the arrival time minus sample time cannot be greater than latency QoS. If latency QoS is not defined, it is assumed to be infinite.

If QoS is violated a corresponding sample is discarded.

It is assumed that time among different network nodes where publishers and subscribers run is synchronized.

MAL also supports a concept of instances. Every structure (defined in ICD) can be assigned a key. A key is a set of structure fields (of primitive types) that uniquely identifies one instance. This allows programmers to publish/retrieve/filter data based on their instance.

To create data entities use createDataEntity methods available on both, publisher and subscriber.

2.5.1. Publisher

Publisher has only one task – to publish data within one topic. There are 2 publish methods available: one with and one without sample timestamp. In case of method without a source timestamp current time is used as a sample timestamp.

Timestamp is a double value representing seconds past 1.1.1970 UTC (to micro precision). There are utility classes that help conversion from/to standardized timestamp values/structures.

Publish methods accept timeout parameter. The method blocks until the message is sent. If the method execution time exceeds timeout time limit TimeoutException is thrown. A non-blocking behavior can be achieved by using timeout value of zero.

Depending on whether data structure contains a key there are two ways of publishing, as shown on the following examples.

This class is thread-safe.

2.5.1.1. Java

  // install DDS MAL mapping
  CiiFactory.installMal("dds");

  try (CiiFactory factory = CiiFactory.getInstance()) {

    URI uri = URI.create("dds.ps:///m1/CabinetTelemetry");
    // create publisher with deadline and latency QoS
    try (Publisher<Sample> publisher =
        factory.getPublisher(uri,
            new QoS[] {
                new Deadline(1, TimeUnit.SECONDS),
                new Latency(100, TimeUnit.MILLISECONDS)
            }, null, Sample.class)) {

      //
      // publish code for keyless topic
      // and user provided source timestamp
      //
      Sample sample = publisher.createDataEntity();
      sample.setDaqId(0);
      sample.setValue(12.8);
      // MalUtil.timestampToDouble(long millis) from mal-common
      // might be helpful to convert Java timestamp from millis
      double timestamp = 1550791384.1d;
      publisher.publish(timestamp, sample,
          DEFAULT_TIMEOUT_SEC, TimeUnit.SECONDS);



      //
      // publish code for keyed topic
      //
      Sample keySample = publisher.createDataEntity();
      // set key field(s)
      keySample.setDaqId(1);

      try (InstancePublisher<Sample> daq1Publisher =
          publisher.createInstancePublisher(keySample)) {

        Sample daq1Sample = daq1Publisher.createDataEntity();
        // key is already set when createDataEntity is
        // called on InstancePublisher
        daq1Sample.setValue(Math.PI);
        daq1Publisher.publish(daq1Sample, DEFAULT_TIMEOUT_SEC, TimeUnit.SECONDS);

      }
      // daq1Publisher is destroyed here,
      // since we are out of try-with-resource scope

    }

  }
}

2.5.1.2. C++

try {
   std::shared_ptr<::elt::mal::Mal> ddsMal = mal::loadMal("dds", mal::Mal::Properties{
     {"dds.domain", "100"}});
   ::elt::mal::CiiFactory &factory = mal::CiiFactory::getInstance();
   factory.registerMal("dds", ddsMal);

   mal::Uri uri("dds.ps:///m1/CabinetTelemetry");
   // Create publisher with deadline and latency QoS
   std::unique_ptr<ps::Publisher<mal::example::Sample>> publisher = factory.getPublisher<mal::example::Sample>(
     uri,
     { std::make_shared<mal::ps::qos::Latency>(
         std::chrono::milliseconds(100)),
       std::make_shared<mal::ps::qos::Deadline>(
         std::chrono::seconds(1)) }, {});

   //
   // publish code for keyless topic
   // and user provided source timestamp
   //
   std::shared_ptr<mal::example::Sample> sample = publisher->createDataEntity();
   sample->setDaqId(0);
   sample->setValue(12.8);
   double timestamp = 1550791384.1;
   publisher->publish(timestamp, *sample, std::chrono::seconds(3));

   //
   // publish code for keyed topic
   //
   std::shared_ptr<mal::example::Sample> keySample = publisher->createDataEntity();
   // set key field(s)
   keySample->setDaqId(1);
   {
     std::unique_ptr<::elt::mal::ps::InstancePublisher<mal::example::Sample>> daq1Publisher =
        publisher->createInstancePublisher(*keySample);
     std::shared_ptr<mal::example::Sample> daq1Sample = daq1Publisher->createDataEntity();
     // daqId is already set as it is created by InstancePublisher
     daq1Sample->setValue(3);
     daq1Publisher->publish(*daq1Sample, std::chrono::seconds(3));
   }
   // daq1Publisher is destroyed here, since its enclosing scope ended
 } catch (std::exception& exc) {
   std::cerr << "EXCEPTION: " << exc.what() << std::endl;
 }

2.5.1.3. Python

Python does not support method overloading. Publisher provides publish() and publishWithTimestamp() methods. publishWithTimestamp() takes timestamp value as first argument. Type of the timestamp value must be float (as returned from time.time()).

Durations and timeouts must be specified as instances of standard datetime.timedelta class.

uri = 'dds.ps:///m1/CabinetTelemetry'

ddsMal = mal.loadMal('dds', {'dds.domain': '100'})
factory = mal.CiiFactory.getInstance()
factory.registerMal('dds', ddsMal)

# Create publisher with deadline and latency QoS.
qosList = [mal.ps.qos.Deadline(datetime.timedelta(seconds=1)),
           mal.ps.qos.Latency(datetime.timedelta(milliseconds=100))]
with factory.getPublisher(uri, Example.Sample,
                          qos=qosList) as publisher:
    #
    # Publish code for keyless topic
    # and user provided source timestamp.
    #

    sample = publisher.createDataEntity()
    sample.setDaqId(0)
    sample.setValue(12.8)
    timestamp = time.time()
    publisher.publishWithTimestamp(sample,
                                   timestamp,
                                   datetime.timedelta(seconds=1))

    #
    # Publish code for keyed topic.
    #

    keySample = publisher.createDataEntity()

    # Set key field(s).
    keySample.setDaqId(1)

    # Use Publisher as context manager.
    with publisher.createInstancePublisher(keySample) as daq1Publisher:
        daq1Sample = daq1Publisher.createDataEntity()
        daq1Sample.setValue(3.14)
        daq1Publisher.publish(daq1Sample, DEFAULT_TIMEOUT)
    # daq1Publisher is closed here.

2.5.2. Subscriber

Each subscriber instance has its own queue. The queue holds the events related to the subscribed topic ordered by the time of insertion into the queue (e.g. data arrival). The events are usually data events, they simply contain the data published, however there are also events that indicate e.g. that there is no publisher for given instance available. The events are represented using DataEvent interface.

The interface via its methods provides access the data (if available), its source timestamp, InstanceState and InstanceEvent enumerations. InstanceState always reports current (not the event was recorded) state of an instance: ALIVE or NOT_ALIVE. InstanceEvent describes the reason why event was created: CREATED (instance has been created/revived), UPDATED (updated data event), REMOVED (instance has been destroyed, i.e. no more publishers available for the instance). A programmer should never assume that data is always available, e.g. in case of REMOVED InstanceEvent there is no data. A programmer should always call hasValidData method to check whether the event holds (valid) data.

NOTE: REMOVED instance event is not guaranteed to be generated when instance is being destroyed (due to differences, or lack of notification, in middleware communication software), therefore software should not rely its logic on arrival of this event.

All the data received by subscriber instance (registered to one topic) is first processed by subscriptions on the subscriber. If any of the data matches a subscription a subscription callback is called and the data is provided in the callback, otherwise the data gets into the queue. A programmer has to take care of emptying the queue, otherwise it might get full and no new data can be received. In order to empty queue user need to invoke one of the read methods, or set deadline QoS property to instruct removal of (too) old data from the queue.

NOTE: You should never close the subscription within the subscription callback as that may cause deadlock, depending on the implementation. If you want your callback to stop processing but you don’t want to close the subscription yet, you should pass in an atomic variable and early return from the callback depending on the state of the variable.

The following text describes some of the use-cases shown in the example code below:

  1. Read data from all instances. Maximum number of data is limited by maxSamples parameter. To read all the data (and clear the queue) use 0 as maxSamples parameter.

  2. If there is no data, an empty (not null) list is returned. This method creates new entities and copies their data; avoid using this method when dealing with large arrays.

  3. Read data from one instance. The instance to be read is given as parameter. To create an instance parameter call createDataEntity and set fields that form a key. If queue for specified instance is empty a null reference is returned. This method creates a new instance (there is an exception in case of C++ by-reference variation the method) and copies its data (in all method variations); avoid using it when dealing with large arrays.

Whenever there is a need to avoid any copies of the data (e.g. large arrays) there a load read methods available. If underlying middleware and mapping implementation support no-copy behavior this methods “loan” the data from the middleware and pass the reference to the data to the programmer. However, this is not always possible. If this is not possible (not supported by the implementation) a copy is made. Also be careful that some middleware might support this only for certain data types: if mapping from middleware data type matches MAL data type and no conversion is needed, then no-copy behavior is possible, otherwise a copy (during conversion) is made.

Synchronous read of events (not just data!) can be done using readDataEvents method. The method returns a count of added events to the list provided as an argument. The method blocks until one of the condition is met: number of events added to the list reaches eventCountLimit or collectTime time is passed. Zero eventCountLimit imposes no limit on number of events added, and zero collectTime turns method to non-blocking and only reads what is currently available in the queue (still limited by eventCountLimit count limit). Do not forget that events might not always data event, check by calling hasValidData method if data is valid first.

For asynchronous subscriptions there are 2 methods: one that provides the data in the callback and the one without. The former method guarantees only that data is valid for the time of the callback. The implementation might avoid doing a copy in this case. The later method only notifies about the presence of new data, the data can be read by using one of synchronous read methods later. However, read call must not be invoke inside the callback. If you want to get the data, use the first method. The callback methods are being executed by some MAL mapping implementation or even middleware threads. Creating asynchronous subscription returns a Subscription instance that is used to cancel the subscription.

Lifetime of subscriptions is limited by the lifetime of a subscriber. If subscriber is being closed the subscriptions are also closed.

In addition there is a method that allows event-pump based subscription. Once subscription is being instantiated a pool method needs to be called in order to get the callback invoked. The callback is being invoked by the same thread that calls poll.

Subscriptions and readDataEvents methods support filtering. Filtering can be done on instance, InstanceState and InstanceEvent fields. Setting a filter on a field means selecting/limiting event to set value. Filter on multiple fields implies all fields must match their set value. Default filter allows all the events. A cached (shared) instance it accessed via DataEventFilter::all() method.

This class is thread-safe.

2.5.2.1. Java

public class SubscriberExample {

  // user data event callback function
  static void dataEventFunction(Subscriber<Sample> subscriber, DataEvent<Sample> event) {
    // NOTE: you should never close the subscription within the callback
    // as that may cause deadlock.
    if (event.hasValidData()) {
      System.out.println(event.getData());
    }
  }

  public static void main(String[] args) {

    // install DDS MAL mapping
    CiiFactory.installMal("dds");

    try (CiiFactory factory = CiiFactory.getInstance()) {
      URI uri = URI.create("dds.ps:///m1/CabinetTelemetry");
      try (Subscriber<Sample> subscriber =
          factory.getSubscriber(uri, QoS.DEFAULT, null, Sample.class)) {

        //
        // Read all alive instances w/o sample count limit.
        //
        List<Sample> allAliveInstances = subscriber.read(0);

        //
        // Read one instance, i.e. daqId == 1.
        //
        Sample keySample = subscriber.createDataEntity();
        keySample.setDaqId(1);
        Sample daq1Sample = subscriber.readInstance(keySample);

        //
        // Loaned (no-copy) example.
        // Loan is returned when close() method is called on LoanedDataEntity
        // or automatically by try-with-resources (as shown below).
        //
        try (LoanedDataEntity<Sample> loanedSample = subscriber.loanedRead()) {
          Sample sampleData = loanedSample.getData();
          if (sampleData != null) {
            // use data here
          }
        }

        //
        // Read data events, filtered by instance and instance state.
        // Block until maxEvents (1000) are read or timeout (3s) occurs.
        //
        final int maxEvents = 1000;
        List<DataEvent<Sample>> events = new ArrayList<DataEvent<Sample>>(maxEvents);
        DataEventFilter<Sample> filter = new DataEventFilter<Sample>();
        filter.setInstance(keySample);
        filter.setInstanceState(InstanceState.ALIVE);
        filter.setInstanceEvents(
            EnumSet.of(InstanceEvent.CREATED, InstanceEvent.UPDATED)
        );
        int samplesRead = subscriber.readDataEvents(filter, events,
            maxEvents, 3, TimeUnit.SECONDS);

        //
        // Asynchronous data subscription, no filtering.
        // 1 second sleep added to simulate some work.
        //
        try (Subscription subs = subscriber.subscribeAsync(DataEventFilter.all(),
            SubscriberExample::dataEventFunction)) {
          Thread.sleep(TimeUnit.SECONDS.toMillis(1));
        } catch (InterruptedException ie) {
          // noop
        }
        // note that subscription is destroyed when leaving try-with-resources scope.

        //
        // Asynchronous notification subscription, no filtering,
        // lambda callback implementation.
        // DO NOT CALL read methods in the callback!
        // 1 second sleep added to simulate some work.
        //
        try (Subscription subs =
            subscriber.subscribeAsync(DataEventFilter.all(),
                (Subscriber<Sample> sub) -> System.out.println("new data"))
            ) {
          Thread.sleep(TimeUnit.SECONDS.toMillis(1));
        } catch (InterruptedException ie) {
          // noop
        }

        //
        // Synchronous (aka event pump) subscription, no filtering.
        // Poll with 1000 events / 1 second limit, endless loop.
        //
        try (Subscription subs = subscriber.subscribe(
                DataEventFilter.all(),
                SubscriberExample::dataEventFunction)) {
          while (true) {
            subscriber.poll(0, 1, TimeUnit.SECONDS);
            // the call above blocks for max. 1 second or
            // until 1000 events are provided to the callback
          }
        }

      }
    }
  }

2.5.2.2. C++

std::shared_ptr<::elt::mal::Mal> ddsMal = mal::loadMal("dds", mal::Mal::Properties{
  {"dds.domain", "100"}});
::elt::mal::CiiFactory &factory = mal::CiiFactory::getInstance();
factory.registerMal("dds", ddsMal);

mal::Uri uri("dds.ps:///m1/CabinetTelemetry");

try {
  std::unique_ptr<ps::Subscriber<mal::example::Sample>> subscriber = factory.getSubscriber<mal::example::Sample>(
      uri, mal::ps::qos::QoS::DEFAULT, {});

  //
  // Read all alive instances w/o sample count limit.
  //
  std::vector<std::shared_ptr<mal::example::Sample>> allAliveInstances = subscriber->read(0);

  //
  // Read one instance, i.e. daqId == 1.
  //
  std::shared_ptr<mal::example::Sample> keySample = subscriber->createDataEntity();
  keySample->setDaqId(1);
  std::shared_ptr<mal::example::Sample> daq1Sample = subscriber->readInstance(*keySample);

  // ... or by providing sample by reference
  std::shared_ptr<mal::example::Sample> sample = subscriber->createDataEntity();
  bool sampleRead = subscriber->readInstance(*keySample, *sample);

  //
  // Loaned (no-copy) example.
  // Loan is returned when close() method is called on LoanedDataEntity
  // or automatically when scope containing loaned data entity
  // terminates.

  {
    std::shared_ptr<::elt::mal::ps::LoanedDataEntity<mal::example::Sample>> loanedSample = subscriber->loanedRead();
    if (loanedSample) {
      mal::example::Sample &sampleData = loanedSample->getData();
      // use data here
    }
  }

  //
  // Read data events, filtered by instance and instance state.
  // Block until maxEvents are read or timeout occurs.
  //
  using Sample = mal::example::Sample;
  std::vector<std::shared_ptr<mal::ps::DataEvent<Sample>>> events(1000);
  mal::ps::DataEventFilter<Sample> filter;
  filter.setInstance(std::shared_ptr<Sample>(std::move(keySample)));
  filter.setInstanceState(mal::ps::InstanceState::ALIVE);
  filter.setInstanceEvents(
   std::set<mal::ps::InstanceEvent>{mal::ps::InstanceEvent::CREATED,
                                    mal::ps::InstanceEvent::UPDATED});
  std::size_t samplesRead = subscriber->readDataEvents(
      filter,
      events, events.capacity(),
      std::chrono::seconds(3));

  //
  // Synchronous (aka event pump) subscription, no filtering.
  // Poll for 1 second w/ no event count limit.
  //
  {
    std::unique_ptr<mal::ps::Subscription> subscription = subscriber->subscribe(
      mal::ps::DataEventFilter<Sample>::all(),
      [](mal::ps::Subscriber<Sample>& subscriber,
         const mal::ps::DataEvent<Sample>& event) -> void {
           // NOTE: you should never close subscription from
           // within a callback, since that may cause deadlock
           std::cout << event.getData() << std::endl;
           });
    subscriber->poll(10, std::chrono::seconds(1));
  }
  // Note that subscription is destroyed when leaving the scope.

  //
  // Asynchronous subscription, no filtering.
  // 1 second sleep added to simulate some work.
  //
  {
    std::unique_ptr<mal::ps::Subscription> subscription = subscriber->subscribeAsync(
      mal::ps::DataEventFilter<Sample>::all(),
      [](mal::ps::Subscriber<Sample>& subscriber,
         const mal::ps::DataEvent<Sample>& event) -> void {
           // NOTE: you should never close subscription from
           // within a callback, since that may cause deadlock
           std::cout << event.getData() << std::endl;
           });
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }

  //
  // Asynchronous notification subscription, no filtering.
  // In callback sync read is called to read all alive samples.
  //
  {
    std::promise<void> eventPromise;
    std::future<void> eventFuture = eventPromise.get_future();

    std::unique_ptr<mal::ps::Subscription> subscription = subscriber->subscribeAsync(
      mal::ps::DataEventFilter<Sample>::all(),
      [&eventPromise](mal::ps::Subscriber<Sample>& subscriber) -> void {
        // NOTE: you should never close subscription from
        // within a callback, since that may cause deadlock
        eventPromise.set_value();
        });

    std::future_status status = eventFuture.wait_for(std::chrono::seconds(1));
    if (status == std::future_status::ready) {
      std::cout << subscriber->read(0).size() << std::endl;
    } else {
      std::cout << "No event was received\n";
    }
  }
} catch (std::exception& exc) {
  std::cerr << "EXCEPTION: " << exc.what() << std::endl;
}

2.5.2.3. Python

Python subscriber offers (as described above) three versions of the subscribe method:

  • subscribe()

    • Creates synchronous subscription. Received data events are passed to provided callback method via poll() call on the subscriber object synchronously.

  • subscribeAsync()

    • Creates asynchronous subscription. Provided callback method is called from another thread as new data events arrive.

  • subscribeAsyncNotifier()

    • Creates asynchronous subscription. Callback method receives change notifications. Method is called from another thread.

When creating data event filter, data entity class (not instance) must be provided as argument to factory function mal.ps.DataEventFilter.all().

DEFAULT_TIMEOUT = datetime.timedelta(seconds=3)
# Poll interval in seconds.
POLL_INTERVAL = datetime.timedelta(seconds=1)
uri = 'dds.ps:///m1/CabinetTelemetry'
ddsMal = mal.loadMal('dds', {'dds.domain': '100'})
factory = mal.CiiFactory.getInstance()
factory.registerMal('dds', ddsMal)
with factory.getSubscriber(uri,
                           Example.Sample,
                           qos=mal.ps.qos.DEFAULT) as subscriber:

    # Read all alive instances w/o sample count limit.

    allAliveInstances = subscriber.read(0)

    # Read one instance i.e. daqId == 1.
    keySample = subscriber.createDataEntity()
    keySample.setDaqId(1)
    daq1sample = subscriber.readInstance(keySample)


    # Loaned (no-copy) example.
    # Loan is returned when close() method is
    # called on LoanedDataEntity. Note: there is
    # no context management protocol available for
    # loaned samples. Use try, finally block.
    loanedSample = subscriber.loanedRead()
    try:
        sampleData = loanedSample.getData()
    finally:
        # Return loan
        loanedSample.close()

    # Read data events, filtered by instance and instance state.
    # Block until maxEvents (1000) are read or timeout (3s) occurs.

    maxEvents = 1000

    # Create new filter for Sample data entity.
    filter = mal.ps.DataEventFilter.all(Sample)
    filter.setInstance(keySample)
    filter.setInstanceState(mal.ps.InstanceState.ALIVE)
    filter.setInstanceEvents(set(mal.ps.InstanceEvent.CREATED,
                                 mal.ps.InstanceEvent.UPDATED))
    # List of samples is returned.
    samples = subscriber.readDataEvents(filter,
                                        maxEvents,
                                        timeout=DEFAULT_TIMEOUT)

    # Asynchronous data subscription, no filtering,
    # 1 second sleep added to simulate some work.

    def dataEventFunction(subscription, dataEvent):
        # NOTE: you should never close subscription from within
        # a callback, since that may cause a deadlock

        # Data event processing method
        if dataEvent.hasValidData():
            sample = dataEvent.getData()
            # use sample

    with subscriber.subscribeAsync(
        mal.ps.DataEventFilter.all(Example.Sample),
                                   dataEventFunction) as subscription:
        # Suspend main thread, dataEventFunction is
        # called from another thread.
        time.sleep(1)
    # Note that subscription is destroyed once with block terminates.

    # Asynchronous data subscription, no filtering,
    # using lambda callback.
    # DO NOT CALL read methods n the callback!
    # 1 second sleep added to simulate some work.
    with subscriber.subscribeAsync(mal.ps.DataEventFilter.all(),
                                   lambda subscriber, dataEvent:
                                   print(dataEvent.getData())) as subscription:
        # Suspend main thread, data is printed from another thread.
        time.sleep(1)

    # Synchronous (aka event pump) subscription, no filtering.
    # Poll with 1000 events / 1 seconds limit, endless loop.
    with subscriber.subscribe(mal.ps.DataEventFilter.all(ExampleSample),
                              dataEventFunction) as subscription:
        while True:
            # dataEventFunction will be called synchronously
            # when new event arrives.
            subscriber.poll(maxEvents,
                            datetime.timedelta(seconds=1))

2.5.3. MrvSubscriber

MrvSubscriber is a convenience variant of a subscriber intended to be used by GUIs. It has only one method that always blocks for a given amount of time. This might be used to implement a refresh loop. Use zero time to read samples from a queue and do not block for any more time. The method always returns a list of most-recent values for each instance. It does not return only changed instances over specified amount. If instance becomes unavailable it is returned in the list anymore. This method creates new entities and copies their data; avoid using this method when dealing with large arrays.

This class is thread-safe.

2.5.3.1. Java

// install DDS MAL mapping
CiiFactory.installMal("dds");

try (CiiFactory factory = CiiFactory.getInstance()) {

  URI uri = URI.create("dds.ps:///m1/CabinetTelemetry");
  try (MrvSubscriber<Sample> mrvSubscriber =
      factory.getMrvSubscriber(uri, QoS.DEFAULT,
          null, Sample.class)) {

    //
    // Every 1 second get most-recent-values of alive instances.
    //
    while (true) {
      List<Sample> mrvSamples = mrvSubscriber.readMostRecent(1, TimeUnit.SECONDS);
      System.out.println(Arrays.toString(mrvSamples.toArray()));
    }

  }
}

2.5.3.2. C++

std::shared_ptr<::elt::mal::Mal> ddsMal = mal::loadMal("dds", {{"dds.domain", "100"}});
::elt::mal::CiiFactory &factory = mal::CiiFactory::getInstance();
factory.registerMal("dds", ddsMal);

mal::Uri uri("dds.ps:///m1/CabinetTelemetry");

try {
  std::unique_ptr<ps::MrvSubscriber<mal::example::Sample>> mrvSubscriber =
     factory.getMrvSubscriber<mal::example::Sample>(uri, mal::ps::qos::QoS::DEFAULT, {});

  //
  // Every 1 second get most-recent-values of alive instance
  //
  while (true) {
    std::vector<std::shared_ptr<mal::example::Sample>> mrvSamples =
      mrvSubscriber->readMostRecent(std::chrono::seconds(1));
    for (const auto &sample : mrvSamples) {
      std::cout << "Sample " << sample->getDaqId() << " / " <<
        sample->getValue() << std::endl;
    }
  }
} catch (std::exception &exc) {
  std::cerr << "EXCEPTION: " << exc.what() << std::endl;
}

2.5.3.3. Python

uri = 'dds.ps:///m1/CabinetTelemetry'
# Load DDS MAL mapping.
ddsMal = mal.loadMal('dds', {'dds.domain': '100'})
factory = mal.CiiFactory.getInstance()
factory.registerMal('dds', ddsMal)

with factory.getMrvSubscriber(uri,
                              Example.Sample,
                              qos=mal.ps.qos.DEFAULT) as mrvSubscriber:

    # Every second get most-recent-value of alive instances.

    timeout = datetime.timedelta(seconds=1)
    while True:
        mrvSamples = mrvSubscriber.readMostRecent(timeout)
        if mrvSamples:
            print(mrvSamples)

2.6. Request-response

Request-response is a coupled peer-to-peer client-server communication pattern. A server exposes to the network one or more instances of ICD-defined interfaces (services). A client connects to the server and invokes one or more methods on the services. MAL guarantees reliable QoS (i.e. request cannot be lost) and exactly-once delivery (i.e. for each method invocation exactly one request is made on the server-side; the request can neither be lost nor duplicated). MAL completely takes care of connection management. There is no need to explicitly issue a connect request, monitor connection status and do a reconnect. MAL takes care of this in the background. A programmer only needs to specify a timeout limit on the request. (Re-)connection time is included in the total execution time of the request). If connection cannot be (re-)established in that time or response is not delivered in time a TimeoutException is thrown. If response is delivered after the timeout expired the response is ignored. Any connection loss during the invocation of the method results in DisconnectedException, i.e. MAL does not try to invoke the method on the server side again after the reconnection.

In order to explicitly wait for connection to be established a connect method returns a future object that completes when the connection is established for the first time.

In cases where a programmer is interested in connection state it can register a connection listener on a client instance. The callbacks are called by MAL mapping or middleware thread.

When multiple responses are needed asynchronous method invocation (AMI) MAL support can be used. Basically, AMI is an asynchronous request with one or more asynchronous responses grouped as request. There is no limitation on number of responses nor time limit on lifetime. Lifetime is limited for the time of connection. Once connection is lost AMI request gets canceled (reported to the client as cancellation exception).

NOTE: Since the Migration to FastDDS, Request-Response communication became

unavailable for MAL DDS C++, Java and Python languages.

2.6.1. Client

When an interface is defined in ICD, a synchronous and asynchronous client variants of the interfaces are generated. A programmer has a choice to select at client instantiation what interface to use. Invoking a method on synchronous interface will block and return its value via method return mechanism, or exception will be thrown. A request timeout can be set via ReplyTime QoS or via client instance created using timeout method. A TimeoutException is thrown in case of a timeout condition. Asynchronous methods always return a future object. A programmer then gets a response, or an exception, back via a future object. There can be more than one asynchronous requests issued/pending concurrently on one client instance.

Type of the interface on the client side does not affect (in any way) how a request is processed on the server side.

AMI methods can be recognized as methods that return Ami<returnType> object. The object is basically an iterator of future objects of returnType type. The future objects behave exactly the same as in case of asynchronous methods – a future object is a promise waiting for the next response. AMI call is completed when the iterator runs out of elements or when exception is being thrown (also thrown in case of disconnection).

The example code below illustrates the usage of a client interface and using all three types of methods.

2.6.1.1. Java

public class ClientExample {

  public static void main(String[] args) {

    // install ZPB MAL mapping
    CiiFactory.installMal("zpb");

    try (CiiFactory factory = CiiFactory.getInstance()) {

      URI uri = URI.create("zpb.rr:///m1/Robot1");

      //
      // Synchronous client example w/ ReplyTime QoS set.
      // RobotControlSync interface is requested.
      //
      try (RobotControlSync robot =
          factory.getClient(uri,
              new QoS[] {
                  new ReplyTime(3, TimeUnit.SECONDS)
              },
              null, RobotControlSync.class)) {

        //
        // Explicitly wait for connection to be established.
        // (This is usually not needed.)
        //
        CompletableFuture<Void> connectionFuture = robot.asyncConnect();
        connectionFuture.get(10, TimeUnit.SECONDS);

        //
        // Invoke synchronous method
        // TimeoutException is throw if default timeout is exceeded.
        //
        robot.command(Command.START_COMMAND);

        //
        // Some methods throw user-defined exceptions.
        // Handle them just like standard Java exceptions.
        //
        try {
          float speed = robot.getSpeed();

          List<Float> speedValues = new ArrayList<Float>();
          speedValues.add(66.6f);
          speedValues.add(41.0f);
          speedValues.add(90.5f);
          boolean checker = robot.validSpeedValues(speedValues);
          if (checker) {
            speed += 30.0f;
          }
          else {
            speed += 3.0f;
          }

          robot.setSpeed(speed);
        }
        catch (TooFast tf) {
          // handle too fast
        }

        //
        // Create a new client instance w/ specific timeout.
        // New instance is connected to the same service instance.
        // Old instance is still valid.
        //
        RobotControlSync robotSlowCall = robot.timeout(1, TimeUnit.MINUTES);
        robotSlowCall.command(Command.STOP_COMMAND);

        //
        // Multiple responses use-case using
        // asynchronous method invocation (AMI).
        //
        try (Ami<String> systemCheckAmi = robot.systemCheck()) {
          for (CompletableFuture<String> future : systemCheckAmi) {
            String msg = future.get();
            System.out.println(msg);
          }
        }

      } catch (Throwable th) {
        // handle exceptions ...
      }


      //
      // Asynchronous client example w/ reply timeout set.
      // RobotControlAsync interface is requested.
      //
      try (RobotControlAsync robot =
          factory.getClient(uri,
              new QoS[] {
                  new ReplyTime(3, TimeUnit.SECONDS)
              },
              null, RobotControlAsync.class)) {

        //
        // Basic case, invoke and then wait, if necessary
        //
        CompletableFuture<Void> cfuture = robot.command(Command.START_COMMAND);
        // ... do some work here ...,
        // and get response (Void in this case), future will block if
        // the response is not yet received, or TimeoutException is throws in reply-time
        // is exceeded
        cfuture.get();

        //
        // Chained CompletableFuture<> example:
        // async getSpeed() is called and on return setSpeed
        // with new value requested, exceptions are handled.
        // Wait for only 2 seconds to complete.
        //
        robot.getSpeed().thenAccept((Float speed) -> robot.setSpeed(speed + 10))
            .exceptionally((th) -> {
              /* handle exception */ return null;
            }).get(2, TimeUnit.SECONDS);

        //
        // Multiple responses use-case using
        // asynchronous method invocation (AMI).
        // Handling is the same as for sync. client.
        //
        try (Ami<String> systemCheckAmi = robot.systemCheck()) {
          for (CompletableFuture<String> future : systemCheckAmi) {
            String msg = future.get();
            System.out.println(msg);
          }
        }

      } catch (Throwable th) {
        // handle exceptions
      }
    }
  }

}

2.6.1.2. C++

std::shared_ptr<::elt::mal::Mal> zpbMal = mal::loadMal("zpb", {{"zpb.domain", "100"}});
::elt::mal::CiiFactory &factory = mal::CiiFactory::getInstance();

factory.registerMal("zpb", zpbMal);

mal::Uri uri("zpb.rr:///m1/Robot1");

//
// Synchronous client example w/ ReplyTime QoS set.
// RobotControlSync interface is requested.
//
try {
  std::unique_ptr<mal::example::RobotControlSync> robot = factory.getClient<mal::example::RobotControlSync>(
    uri,
    {std::make_shared<mal::rr::qos::ReplyTime>(std::chrono::seconds(3))},
    {});

  //
  // Explicitly wait for connection to be established.
  // (This is usually not needed.)
  //
  ::elt::mal::future<void> connectionFuture = robot->asyncConnect();
  std::future_status futureStatus =
    connectionFuture.wait_for(::boost::chrono::seconds(10));
  bool connected = (futureStatus == std::future_status::ready);

  //
  // Invoke synchronous methods
  // TimeoutException is throw if default timeout is exceeded.
  //

  robot->command(mal::example::RobotControl::Command::START_COMMAND);

  try {
    float speed = robot->getSpeed();

    mal::shared_vector<const float> speedArray = {66.6, 41.0, 90.5};
    bool checker = robot->validSpeedValues(speedArray);
    if(checker) {
      speed += 30;
      }
    else {
      speed += 3;
    }

    robot->setSpeed(speed);
  } catch (mal::example::RobotControl::TooFast& tf) {
    // handle too fast
  }

  //
  // Multiple responses use-case using
  // asynchronous method invocation (AMI).
  //
  std::shared_ptr<StrAmi> ami = robot->systemCheck();
  for (auto future : *ami) {
      std::cout << future.get() << std::endl;
  }

  //
  // Create a new client instance w/ specific timeout.
  //
  robot->timeout(std::chrono::seconds(10))
    ->command(mal::example::RobotControl::Command::STOP_COMMAND);
} catch (std::exception& exc) {
  std::cerr << "EXCEPTION: " << exc.what() << std::endl;
}

//
// Asynchronous client example w/ reply timeout set.
// RobotControlAsync interface is requested.
//
try {
  std::unique_ptr<mal::example::RobotControlAsync> robot = factory.getClient<mal::example::RobotControlAsync>(
      uri,
      {std::make_shared<mal::rr::qos::ReplyTime>
         (std::chrono::seconds(3))},
      {});

  //
  // Basic case, invoke and then wait, if necessary
  //
  mal::future<void> cfuture = robot->command
   (mal::example::RobotControl::Command::START_COMMAND);
  // ... do some work here ...,
  // and get response (void in this case), future will block if
  // the response is not yet received, or TimeoutException is throws in
  // reply-time is exceeded
  cfuture.get();

  // Chained future<> example:
  // async getSpeed() is called and on return setSpeed
  // with new value requested, exceptions are handled.
  // Wait for 2 seconds to complete.
  try {
    mal::future<float> future = robot->
     getSpeed().then([&](::mal::future<float> f)
       { return robot->setSpeed(f.get() + 0.1); }).get();

    if (future.wait_for(boost::chrono::seconds(2)) !=
        ::boost::future_status::ready) {
        // handle timeout
    }

  } catch (mal::example::RobotControl::TooFast& tf) {
    // handle too fast
  }

  //
  // Multiple responses use-case using
  // asynchronous method invocation (AMI).
  //
  std::shared_ptr<StrAmi> ami = robot->systemCheck();
  for (auto future : *ami) {
      std::cout << future.get() << std::endl;
  }
} catch (std::exception& exc) {
  std::cerr << "EXCEPTION: " << exc.what() << std::endl;
}

2.6.1.3. Python

When using asynchronous client, result of the method call on the client reference is always a Future object. Actual result value must be obtained with get method on the result object.

To check state of the Future object, use its check method. Method takes no arguments and returns enumeration mal.rr.FutureStatus that can have the one of the following values:

  • DEFERRED – requested method was not started yet.

  • READY – requested method completed and result can be retrieved with call to get method.

  • TIMEOUT – wait was performed on the Future, but timeout occurred.

import elt.pymal as mal
# Import synchronous client implementation
from ModManualExamples.Elt.Mal.Example.RobotControl import RobotControlSync
# Import asynchronous client implementation
from ModManualExamples.Elt.Mal.Example.RobotControl import RobotControlAsync
from ModManualExamples.Elt.Mal.Example import Command, TooFast

# 3 seconds.
THREE_SECONDS = datetime.timedelta(seconds=3)

# Minute
MINUTE = datetime.timedelta(seconds=60)

def _main():
    """Main function implementation"""

    # sync example
    try:
        # Load ZPB MAL mapping.
        uri = 'zpb.rr:///m1/Robot1'
        zpbMal = mal.loadMal('zpb', {'zpb.domain': '100'})
        factory = mal.CiiFactory.getInstance()
        factory.registerMal('zpb', zpbMal)

        # Synchronous client example w/ ReplyTime QoS timeout set.
        # RobotControlSync interface is requested.
        with factory.getClient(uri,
                               RobotControlSync,
                               qos=mal.rr.qos.ReplyTime(THREE_SECONDS)) as robot:

            # Explicitly wait for connection to be established.
            # (This is usually not needed).
            connectionFuture = robot.asyncConnect()
            connectionFuture.wait_for(THREE_SECONDS)

            # Invoke synchronous methods
            # TimeoutError is thrown if default timeout is exceeded.

            robot.command(Command.START_COMMAND)
            try:
                speed = robot.getSpeed()

                speedArray = mal.makeSharedVector('f', 3, 90.5)
                speedArray = speedArray.freeze()
                checker = robot.validSpeedValues(speedArray)
                if checker:
                  speed += 30
                else:
                  speed += 3

                robot.setSpeed(speed)
            except TooFast as e:
                # Handle too fast
                pass

            # Multiple responses use-case using
            # asynchronous method invocation (AMI).

            systemCheckAmi = robot.systemCheck()
            # We are not using context manager protocol here.
            # Once we are done with the AMI, it should be
            # closed().
            try:
                # Result set is iterable.
                for msg in systemCheckAmi:
                    print(msg)
            finally:
                systemCheckAmi.close()

        # Asynchronous example w/reply timeout set.
        # RobotControlAync interface is requested.
        with factory.getClient(uri,
                               RobotControlAsync,
                               qos=mal.rr.qos.ReplyTime(THREE_SECONDS)) as robot:

            # Basic case, invoke and then wait, if necessary
            future = robot.command(Command.START_COMMAND)

            # ... do some work here ...
            # and get response (None in this case). Future will block
            # it the response is not yet received or TimeoutError is
            # thrown when reply time is exceeded.

            future.get()

            # Multiple responses use-case using
            # asynchronous method invocation (AMI).
            # Handling is the same as for sync. client.

           with robot.systemCheck() as systemCheckAmi:
                for future in systemCheckAmi:
                    print('System Check: ', future.get())
    except Exception as e:
        # Handle exceptions or
        raise

2.6.2. Server

Creating a server instance just creates a container where services can be registered. A service is a uniquely named instance reachable over the network. There can be multiple services of the same or different interface type registered to the server. Same service instance can also be registered multiple times using different names. The server will just propagate the request to a service by its name. Once a service is unregistered it is no longer reachable over the network. Any pending requests on unregistered service will complete normally.

When an interface is defined in ICD, a synchronous and asynchronous service variants of the interfaces are generated. A programmer needs to implement one of these interfaces. Only these interfaces can be registered as a service. An interface type dictates how requests are processed on the server side. Server processes requests of all the services using one thread. Once a request is received it decodes the request and calls a method on specified service. If the service implements a synchronous interface then invocation of its method will block. This includes processing of other requests of the same and other services. In case of asynchronous service the method will block just for a method to return a future object. The server will continue to process other pending requests. The server will send the response of asynchronous services back to the client immediately when the return value, or an exception, is given to the future object. This implies that asynchronous service responses might come back in a different order than their requests were issued.

When a C++ MAL service throws a known exception (i.e. declared in the MAL-ICD), it it is passed through network to the client, where it is re-raised. However, if the service throws other exceptions: a MAL exception of a type that is not declared for this particular method, a standard exception, or throws a C++ object that is not derived from the std::exception, then MAL will behave as shown in Table 1.

ICD

Exc. type

Server behaviour

Client behaviour

throws

MAL excepti ion but other than declared

catch and propaga te UnhandledExcep tion

receive UnhandledRemoteExcep tion

throws

non-MAL C++ exception, e.g. std::ou t_of_range

terminate

MalException: Client disconnected

throws

non-MAL, non C++ exception (std:string)

terminate

MalException: Client disconnected

no throws

MalException although none was declared

catch and propaga te UnhandledExcep tion

receive UnhandledRemoteExcep tion

no throws

non-MAL, C++ exception, e.g. std::ou t_of_range

terminate

MalException: Client disconnected

no throws

non-MAL, non C++ exception (std:string)

terminate

MalException: Client disconnected

Table 1 MAL behaviour when an interface method throws an undeclared exception

Implementation of server-side AMI methods is the same for synchronous and asynchronous service interfaces. AMI methods declarations return the same Ami<returnType> as on the client side. However, the service needs to provide the implementation of the Ami<returnType>. The creation of it is done via ServerContextProvider singleton instance implemented as thread-safe-storage instance that provides access to server-side helper methods. A createAmi method will create a ServerAmi<returnType> instance (that implements Ami<returnType> interface). The implementation of the AMI method just needs to setup asynchronous execution of the method (the method should not block!) and return the ServerAmi<returnType> instance. Sending responses to the client is done by calling one of the three methods on the ServerAmi<returnType> instance: complete (non-last response), completeExceptionally (exception response, last response) and completed (last response). Once last response is sent the instance gets automatically released.

See the examples below on how to implement synchronous and asynchronous service.

The server has no responsibility over the lifecycle of registered services.

2.6.2.1. Java

public class ServerExample {

  private static void simulateWork() {
    try {
      Thread.sleep(100);
    } catch (InterruptedException ex) {
      // noop
    }
  }

  /**
   * Synchronous RobotControl implementation example,
   * must implement &lt;ICD interface name&gt; interface.
   */
  static class RobotControlImpl implements RobotControl {

    private final float maxSpeed;
    private List<Float> allSpeeds;
    private List<Float> tempSpeeds;

    public RobotControlImpl(float maxSpeed, List<Float> speeds) {
      this.maxSpeed = maxSpeed;
      this.tempSpeeds = speeds;
      this.allSpeeds = new ArrayList<Float>();
      this.allSpeeds.add(0.0f);
    }

    @Override
    public void command(Command com) {
      switch (com) {
        case START_COMMAND:
          this.allSpeeds = new ArrayList<Float>(this.tempSpeeds);
          break;

        case STOP_COMMAND:
          this.allSpeeds = new ArrayList<Float>();
          this.allSpeeds.add(0.0f);
          break;

        default:
          // undefined exceptions, i.e. exception not defined in ICD,
          // translate to UnhandledRemoteException on the remote side
          throw new IllegalArgumentException();
      }
    }

    @Override
    public float setSpeed(float speed) throws TooFast {
      if (speed > this.maxSpeed) {
        throw new TooFast();
      }

      this.allSpeeds.add(speed);
      return this.allSpeeds.get(this.allSpeeds.size() - 1);
    }

    @Override
    public float getSpeed() {
      return this.allSpeeds.get(this.allSpeeds.size() - 1);
    }

    @Override
    public boolean validSpeedValues(List<Float> values) {
      boolean valid = true;
      for (float val : values) {
        if (val < 0 || val > this.maxSpeed) {
          valid = false;
          break;
        }
      }
      return valid;
    }

    @Override
    public Ami<String> systemCheck() {

      // request for ServerAmi instance
      // to be returned by this method
      ServerAmi<String> ami = ServerContextProvider.getInstance().createAmi(String.class);

      // setup asynchronous execution
      Executors.newCachedThreadPool().execute(() -> {

        simulateWork();
        // first response
        ami.complete("Battery OK");

        simulateWork();
        // second response
        ami.complete("Drivetrain OK");

        simulateWork();
        // final response
        // NOTE: completed vs complete method name
        ami.completed("Engine OK");
      });

      // return AMI instance
      return ami;
    }

  }

  /**
   * Asynchronous RobotControl implementation example,
   * must implement Async&lt;ICD interface name&gt;.
   */
  static class AsyncRobotControlImpl implements AsyncRobotControl {

    private final float maxSpeed;
    private List<Float> allSpeeds;
    private List<Float> tempSpeeds;

    public AsyncRobotControlImpl(float maxSpeed, List<Float> speeds) {
      this.maxSpeed = maxSpeed;
      this.tempSpeeds = speeds;
      this.allSpeeds = new ArrayList<Float>();
      this.allSpeeds.add(0.0f);
    }

    @Override
    public CompletableFuture<Void> command(Command com) {
      // create future
      CompletableFuture<Void> future = new CompletableFuture<>();

      // setup asynchronous execution
      Executors.newCachedThreadPool().execute(() -> {
        switch (com) {
          case START_COMMAND:
            this.allSpeeds = new ArrayList<Float>(this.tempSpeeds);
            // notify the completion w/ result
            future.complete(null);
            break;

          case STOP_COMMAND:
            this.allSpeeds = new ArrayList<Float>();
            this.allSpeeds.add(0.0f);
            // notify the completion w/ result
            future.complete(null);
            break;

          default:
            // undefined exceptions, i.e. exception not defined in ICD,
            // translate to UnhandledRemoteException on the remote side
            // notify the completion w/ exception
            future.completeExceptionally(new IllegalArgumentException());
        }
      });

      // return future
      return future;
    }

    @Override
    public CompletableFuture<Float> setSpeed(float speed) {

      // create future
      CompletableFuture<Float> future = new CompletableFuture<>();

      // setup asynchronous execution
      Executors.newCachedThreadPool().execute(() -> {
        if (speed > this.maxSpeed) {
          // notify the completion w/ exception
          future.completeExceptionally(new TooFast());
        }

        this.allSpeeds.add(speed);
        // notify the completion w/ result
        future.complete(speed);
      });

      // return future
      return future;
    }

    @Override
    public CompletableFuture<Float> getSpeed() {
      // if the execution of the method is short
      // an already completed future can be returned
      return CompletableFuture.completedFuture(this.allSpeeds.get(this.allSpeeds.size() - 1));
    }

    @Override
    public CompletableFuture<boolean> validSpeedValues(List<Float> values) {

      // create future
      CompletableFuture<Float> future = new CompletableFuture<>();

      // setup asynchronous execution
      Executors.newCachedThreadPool().execute(() -> {
        boolean valid = true;
        for (float val : values) {
          if (val < 0 || val > this.maxSpeed) {
            valid = false;
            break;
          }
        }

        // notify the completion w/ result
        future.complete(valid);
      }

      // return future
      return future;
    }

    @Override
    public Ami<String> systemCheck() {

      // request for ServerAmi instance
      // to be returned by this method
      ServerAmi<String> ami = ServerContextProvider.getInstance().createAmi(String.class);

      // setup asynchronous execution
      Executors.newCachedThreadPool().execute(() -> {

        simulateWork();
        // first response
        ami.complete("Battery OK");

        simulateWork();
        // second response
        ami.complete("Drivetrain OK");

        simulateWork();
        // final response
        // NOTE: completed vs complete method name
        ami.completed("Engine OK");
      });

      // return AMI instance
      return ami;
    }

  }

  /**
   * Main entry-point.
   *
   * @param args command-line arguments.
   */
  public static void main(String[] args) {

    URI uri = URI.create("zpb.rr:///m1");

    // install ZPB MAL mapping
    CiiFactory.installMal("zpb");

    try (CiiFactory factory = CiiFactory.getInstance()) {

      //
      // Create server, default QoS.
      // Currently there are no MAL QoS relevant for the Server.
      //
      try (Server server = factory.createServer(uri, QoS.DEFAULT, null)) {

        List<Float> speedsArray = new ArrayList<Float>();
        speedsArray.add(0.1f);
        speedsArray.add(49.4f);
        speedsArray.add(74.4f);

        //
        // Register three Robot services.
        // They are all separate separate instances.
        // First two instances have synchronous implementation,
        // the third has asynchronous.
        server.registerService("Robot1", new RobotControlImpl(120.0f, speedsArray));
        server.registerService("Robot2", new RobotControlImpl(80.0f, speedsArray));
        server.registerService("Robot3", new AsyncRobotControlImpl(20.0f, speedsArray));

        // Process commands until server.shutdown() is called.
        server.run();
      }
    }
  }

}

2.6.2.2. C++

namespace mal = ::elt::mal;

/**
 * Simulate some work
 */
static void simulateWork() {
  std::this_thread::sleep_for(std::chrono::seconds(2));
}

/**
 * Synchronous RobotControl implementation example,
 * must implement RobotControl interface.
 */
class RobotControlImpl: public virtual mal::example::RobotControl {
 public:

  RobotControlImpl(float max, mal::shared_vector<const float> speeds)
    : maxSpeed(max), tempSpeeds(speeds) {
      allSpeeds = {0.0};
 }

  void command(RobotControl::Command command) override {
    switch (command) {
      case RobotControl::Command::START_COMMAND:
        allSpeeds = tempSpeeds;
        break;

      case RobotControl::Command::STOP_COMMAND:
        allSpeeds = {0.0};
        break;

      default:
        throw std::invalid_argument("RobotControlImpl: Unknown command");
    }
  }

  float setSpeed(float newSpeed) override {
    if (speed > maxSpeed) {
      throw RobotControl::TooFast(
       "RobotControlImpl::setSpeed, speed too high");
    }

    ::elt::mal::shared_vector<float> mutableSpeeds = thaw(allSpeeds);
    mutableSpeeds.push_back(newSpeed);
    allSpeeds = freeze(mutableSpeeds);

    return allSpeeds.back()
  }

  float getSpeed() const override { return allSpeeds.back(); }

  bool validSpeedValues(const ::elt::mal::shared_vector<const float>& values) override {
    bool valid = true;
    for (const auto& val: values) {
      if(val < 0 || val > maxSpeed) {
        valid = false;
        break;
      }
   }
   return valid;
 }

  std::shared_ptr<RobotControl::StrAmi> systemCheck() override {
    // Obtain ptr to ServerAmi
    std::shared_ptr<RobotControl::StrAmi> ami = mal::rr::ServerContextProvider
      <mal::example::ServerContextImpl<std::string>>::
      getInstance().createAmi();

    auto future = boost::async(boost::launch::async, [=]() {
      simulateWork();
      ami->complete("Battery OK");
      simulateWork();
      ami->complete("Drivertrain OK");
      simulateWork();
      ami->completed("Engine OK");
  });

  // Return ptr to Ami
  return ami;
}

 private:
  float maxSpeed;
  mal::shared_vector<const float> allSpeeds;
  mal::shared_vector<const float> tempSpeeds;
};

/**
 * Asynchronous RobotControl implementation example,
 * must implement AsyncRobotControl interface.
 */
class AsyncRobotControlImpl: public virtual mal::example::AsyncRobotControl {
public:

  AsyncRobotControlImpl(float max, mal::shared_vector<const float> speeds)
    : maxSpeed(max), tempSpeeds(speeds) {
      allSpeeds = {0.0}
  }

  mal::future<void> command(mal::example::RobotControl::Command command)
   override {
    std::function<void()> f = [this, command]() -> void {
      std::lock_guard<std::mutex> guard(speedMutex);
      switch (command) {
        case mal::example::RobotControl::Command::START_COMMAND:
          allSpeeds = tempSpeeds;
          break;
        case mal::example::RobotControl::Command::STOP_COMMAND:
          allSpeeds = {0.0};
          break;

        default:
          throw std::invalid_argument("RobotControlImpl: Unknown command");
      }
    };
    return boost::async(boost::launch::async, f);
 }

  mal::future<float> setSpeed(float newSpeed) override {
    return boost::async(boost::launch::async, [this, newSpeed]() -> float {
      std::lock_guard<std::mutex> guard(speedMutex);
      if (newSpeed > maxSpeed) {
        throw mal::example::RobotControl::TooFast(
         "RobotControlImpl::setSpeed, speed too high");
      }

      ::elt::mal::shared_vector<float> mutableSpeeds = thaw(allSpeeds);
      mutableSpeeds.push_back(newSpeed);
      allSpeeds = freeze(mutableSpeeds);

      return allSpeeds.back()
    });
  }

  mal::future<float> getSpeed() const override {
   boost::promise<float> promise;
   promise.set_value(allSpeeds.back());
   return promise.get_future();
  }

  mal::future<float> validSpeedValues(const ::elt::mal::shared_vector<const float>& values) override {
   boost::promise<bool> promise;

   bool valid = true;
   for (const auto& val: values) {
     if(val < 0 || val > maxSpeed) {
       valid = false;
       break;
     }
   }

   promise.set_value(valid);
   return promise.get_future();
 }

  std::shared_ptr<AsyncRobotControl::StrAmi> systemCheck() override {
    // Obtain ptr to ServerAmi
    std::shared_ptr<AsyncRobotControl::StrAmi> ami = mal::rr::ServerContextProvider
      <mal::example::ServerContextImpl<std::string>>::
      getInstance().createAmi();

    auto future = boost::async(::boost::launch::async, [=]() {
      simulateWork();
      ami->complete("Battery OK");
      simulateWork();
      ami->complete("Drivertrain OK");
      simulateWork();
      ami->completed("Engine OK");
  });

  // Return ptr to Ami
  return ami;
}

 private:
  float maxSpeed;
  std::mutex speedMutex;
  mal::shared_vector<const float> allSpeeds;
  mal::shared_vector<const float> tempSpeeds;
};

/**
 * Server example.
 *
 * Main entry point.
 * @param argc number of command line arguments
 * @param argv command line arguments
 */
int main(int argc, char** argv) {

  // Load ZPB MAL mapping
  std::shared_ptr<::elt::mal::Mal> zpbMal = mal::loadMal("zpb", {{"zpb.domain", "100"}});
  // Register mapping with CiiFactory
  ::elt:: &factory = mal::CiiFactory::getInstance();
  factory.registerMal("zpb", zpbMal);

  mal::Uri uri("zpb.rr:///m1");

  try {
    //
    // Create server, default QoS.
    // Currently there are not MAL QoS relevant for the Server
    //
    std::unique_ptr<rr::Server> server = factory.createServer(uri, mal::rr::qos::QoS::DEFAULT, {});

    mal::shared_vector<const float> speedsArray = {0.1, 49.4, 74.4};

    // Register three Robot services.
    // They are all separate instances.
    // First two instance have synchronous implementation,
    // third has asynchronous.
    std::shared_ptr<mal::rr::RrEntity> robot1(new RobotControlImpl(120.0, speedsArray));
    std::shared_ptr<mal::rr::RrEntity> robot2(new RobotControlImpl(80.0, speedsArray));
    std::shared_ptr<mal::rr::RrEntity> robot3(new AsyncRobotControlImpl(80.0, speedsArray));

    server->registerService<mal::example::RobotControl, true>
     ("Robot1", robot1);
    server->registerService<mal::example::RobotControl, true>
     ("Robot2", robot2);
    server->registerService<mal::example::RobotControl, false>
     ("Robot3", robot3);

    // Process commands until server->shutdown() is called.
    server->run();
  } catch (std::exception& exc) {
    std::cerr << "EXCEPTION: " << exc.what() << std::endl;
  }
  return 0;
}

2.6.2.3. Python

import datetime
import time
import threading
from concurrent.futures import ThreadPoolExecutor

# import top level Python MAL API module
import elt.pymal as mal
# import RobotControl binding
from ModManualExamples.Elt.Mal.Example import RobotControl
# import Command enumeration and TooFast exception
from ModManualExamples.Elt.Mal.Example import Command, TooFast


def simulateWork():
  time.sleep(1.0)

class RobotControlSyncImpl:
    """Synchronous RobotControl implementation example, must
       implement methods defined in the ICD definition file:
       command, setSpeed, getSpeed, systemCheck
    """
    def __init__(self, maxSpeed, speeds):
        """constructor"""
        self._allSpeeds = [0.0]
        self._maxSpeed = maxSpeed
        self._tempSpeeds = speeds

    def command(self, com):
        """Handle command"""
        if com == Command.START_COMMAND:
            self._allSpeeds = self._tempSpeeds
        elif com == Command.END_COMMAND:
            self._allSpeeds = [0.0]
        else:
          raise ValueError('Invalid command value')

    def setSpeed(self, speed):
        """Set new speed.
           Throws TooFast when speed exceeds maximum speed.
        """
        if speed > self._maxSpeed:
            raise TooFast()
        self._allSpeeds.append(speed)
        return self._allSpeeds[-1]

    def getSpeed(self):
        """Return current speed"""
        return self._allSpeeds[-1]

    def validSpeedValues(self, values):
        """Return Validation of entry Values"""
        result = True
        for val in values:
         if val < 0 or val > self._maxSpeed:
             result = False
             break

        return result

    def systemCheck(self):
        """Perform system check"""
        # Obtain instance to the server context.
        # This call uses ServerContextProvider.getInstance()
        # method internally.
        context = RobotControl.ServerContextString.getInstance()
        # Obtain server AMI
        serverAmi = context.createAmi()
        # Setup asynchronous execution
        systemCheckThread = threading.Thread(self._systemCheck,
          args=(serverAmi,))
        systemCheckThread.start()
        return serverAmi

    def _systemCheck(self, ami):
        """Actual systemCheck implementation"""
        simulateWork()
        # first response
        ami.complete('Battery OK')

        simulateWork()
        # second response
        ami.complete("Drivetrain OK")

        simulateWork()
        # final response, use completed method
        ami.completed("Engine OK")

class RobotControlAsyncImpl:
    """Asynchronous RobotControl implementation example
       all methods that don't return Ami, must return
       concurrent.futures.Future object
    """
    def __init__(self, maxSpeed, speeds):
      """constructor"""
      self._executor = ThreadPoolExecutor(max_workers=4)
      self._allSpeeds = [0.0]
      self._maxSpeed = maxSpeed
      self._tempSpeeds = speeds

    def command(self, com):
        """Handle command"""
        return self._executor.submit(self, _command, com)

    def _command(self, com):
        """Command implementation"""
        if com == Command.START_COMMAND:
            self._allSpeeds = self._tempSpeeds
        elif com == Command.END_COMMAND:
            self._allSpeeds = [0.0]
        else:
          raise ValueError('Invalid command value')

    def setSpeed(self, speed):
        """Set speed"""
        self._executor.submit(self._setSpeed, speed)

    def _setSpeed(self, speed):
        """setSpeed implementation
           Throws TooFast when speed exceeds maximum speed.
        """
        if speed > self._maxSpeed:
            raise TooFast()
        self._allSpeeds.append(speed)
        return self._allSpeeds[-1]

    def getSpeed(self):
        """Get speed"""
        return self._executor.submit(lambda x: x, self._allSpeeds[-1])

    def validSpeedValues(self, values):
        """Check if values are inside the limitations"""
        return self._executor.submit(self._validSpeedValues, values)

    def _validSpeedValues(self, values):
        """validSpeedValues implementation
           Sanity check for list of values.
        """
        result = True
        for val in values:
         if val < 0 or val > self._maxSpeed:
           result = False
           break

        return result

    def systemCheck(self):
        """Perform system check"""
        # Obtain instance to the server context.
        # This call uses ServerContextProvider.getInstance()
        # method internally.
        context = RobotControl.ServerContextString.getInstance()
        # Obtain server AMI
        serverAmi = context.createAmi()
        # Setup asynchronous execution
        systemCheckThread = threading.Thread(self._systemCheck,
          args=(serverAmi,))
        systemCheckThread.start()
        return serverAmi

    def _systemCheck(self, ami):
        """Actual systemCheck implementation"""
        simulateWork()
        # first response
        ami.complete('Battery OK')

        simulateWork()
        # second response
        ami.complete("Drivetrain OK")

        simulateWork()
        # final response, use completed method
        ami.completed("Engine OK")


def main():
    uri = 'zpb.rr:///m1'

    # Load ZPB MAL mapping
    zpbMal = mal.loadMal('zpb')
    factory = CiiFactory.getInstance()
    factory.registerMal('zpb', zpbMal)

    with factory.createServer(uri, mal.rr.qos.DEFAULT, {}) as server:

        speedsArray = [0.1, 49.4, 74.4]

        # Register three Robot services.
        # They are all separate instances.
        # First two have synchronous implementation,
        # the third has asynchronous implementation.
        server.registerService('Robot1',
                               RobotControl.RobotControlSyncService,
                               RobotControlSyncImpl(120.0, speedsArray))
        server.registerService('Robot2',
                               RobotControl.RobotControlSyncService,
                               RobotControlSyncImpl(80.0, speedsArray))
        server.registerService('Robot3',
                               RobotControl.RobotControlAsyncService,
                               RobotControlAsyncImpl(20.0, speedsArray))

        server.run()

2.7. Appendix

2.7.1. Building CII application with WAF

A CII application must be built using waf/wtools that has ICD generator (icd-gen tool) and all other supported middleware generators are integrated. All the artifacts are built into property formed libraries that are needed to be dynamically loaded by MAL mappings. Steps to build a CII application is the following:

  • Create a project directory with a project wscript file, e.g.:

from wtools.project import declare_project

wtools.project.declare_project('icd-demo-folder', '0.1-dev-version',        # Project directory name and version
                               recurse='icd-app1 icd-app2',                 # Name of all applications folders
                               requires='cxx boost java python protoc cii', # Primary libraries to be linked
                               boost_libs='log log_setup thread system')    # Secondary libraries to be linked
                                                                            # ...

Project directory will contain as many applications necessary with the correspondent ICD XML files. Following the above example code 2 application will be implemented. So, after creating the applications directories:

  • Inside bought application directory, 2 subdirectories should be created: ‘icd’ (contains ICD files for appliciations) and ‘app’ (where the application implementation is located). Names of those two subdirectories can be different.

  • Parallel to this folders a wscript must be added with the following sctruture:

from wtools import package

# Declare external libraries to be used specifically by the respective application
def configure(conf):
    """System installed libraries"""
    conf.check_cfg(package='opentracing_api', uselib_store='OPENTRACE', args='--cflags --libs')

    """Created WAF libraries based on other applications"""
    conf.check_wdep(wdep_name='cii.mal-common', uselib_store='MALCOMMON')
    conf.check_wdep(wdep_name="client-api.config", uselib_store="CLIENTCONF")

package.declare_package(recurse='app icd') # Folders inside application directory

Jumping in too the declared packeges (‘app’ and ‘icd’), each one must have a wscript file. Starting with the ‘app’ subdirectories, is worth noting that depending on the developer language the WAF method call to create the correspondent executable file will change (‘declare_cprogram’ ‘declare_pyprogram’ ‘declare_jar’):

from wtools.module import declare_cprogram

"""From pre-checked libraries in previous folder 'wscript', call the ones needed for the respective application"""
declare_cprogram(target='executable-app-language', use='BOOST CLIENTCONF MALCOMMON OPENTRACE')

And the ‘icd’ subdirectories with a wscript like:

from wtools.module import declare_malicd

declare_malicd() # Enables ICD artifacts generation

Remember that when building the project, the WAF Tool will only use files that exist inside a ‘src’ folder. So all necessary coding for the application to work as entended must be inside a source directory. In the end a developer should have a structure like this:

root
`-- icd-demo-folder
    |-- icd-app1
    |    |-- app
    |    |   |-- src
    |    |   |   `-- main-app1-file.xxx
    |    |   `-- wscript
    |    |-- icd
    |    |   |-- src
    |    |   |   `-- icd-app1-file.xml
    |    |   `-- wscript
    |    `-- wscript
    |
    |-- icd-app2
    |    |-- app
    |    |   |-- src
    |    |   |   `-- main-app2-file.xxx
    |    |   `-- wscript
    |    |-- icd
    |    |   |-- src
    |    |   |   `-- icd-app2-file.xml
    |    |   `-- wscript
    |    `-- wscript
    |
    `-- wscript

See icd-demo project as example.