ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
SimCtl.py
Go to the documentation of this file.
1 import json
2 import datetime
3 
4 from robot.api import logger
5 from robot.utils import DotDict
6 from robot.utils import timestr_to_secs
7 
8 from elt import pymal
9 from ModDaqsimif.Daqsimif.SimCtl import SimCtlSync
10 
11 
12 def get_rr_uri(arg):
13  if "@" in arg:
14  # Split name@rr-uri,ps-uri
15  uris = arg.split("@")[1]
16  return uris.split(",")[0]
17  return arg
18 
19 
20 def to_list(arg):
21  if isinstance(arg, list):
22  return arg
23  return arg.split(" ")
24 
25 
26 class SimCtl:
27  """Robot library providing keywords for the SimCtl interface"""
28 
29  # Global scope is necessary since MAL will shutdown protobuf library randomly
30  # otherwise, which eventually lead to segfaults. See ECII-304.
31  ROBOT_LIBRARY_SCOPE = "GLOBAL"
32 
33  def __init__(self):
34  logger.debug("Loading MAL and creating factory")
35  self.malmal = pymal.loadMal("zpb", {})
36  self.factoryfactory = pymal.CiiFactory.getInstance()
37  self.factoryfactory.registerMal("zpb", self.malmal)
38  logger.debug("Loading MAL and creating factory done")
39 
40  def simctl_create_clients(self, uris, timeout=None, replace_from=None, replace_to=None):
41  """Creates a list of clients with a given ReplyTime timeout."""
42  logger.info("Create clients raw arg: %r, type=%s" % (uris, type(uris)))
43  timeout = timestr_to_secs(timeout) if timeout is not None else 3
44  clients = []
45  # uris may be a single string in the OCM format name@rr://URI,ps://URI. This is taken care
46  # of automatically
47  for uri in to_list(uris):
48  uri = get_rr_uri(uri)
49  if replace_from and replace_to:
50  uri = uri.replace(replace_from, replace_to)
51  logger.info("Create client for uri %s with timeout %d" % (uri, timeout))
52  client = self.factoryfactory.getClient(
53  uri, SimCtlSync, pymal.rr.qos.ReplyTime(datetime.timedelta(seconds=timeout))
54  )
55  clients.append(client)
56  return clients
57 
58  def simctl_create_client(self, uri, timeout=None):
59  """Creates a client with a given ReplyTime timeout."""
60  timeout = timestr_to_secs(timeout) if timeout is not None else 3
61  uri = get_rr_uri(uri)
62  logger.info("Create client for uri %s with timeout %d" % (uri, timeout))
63  client = self.factoryfactory.getClient(
64  uri, SimCtlSync, pymal.rr.qos.ReplyTime(datetime.timedelta(seconds=timeout))
65  )
66  return client
67 
68  def __setup(self, clients, arg):
69  logger.info("Sending setup: %s to %d clients" % (arg, len(clients)))
70  replies = []
71  for client in clients:
72  reply = client.Setup(arg)
73  logger.info("Received reply: %s" % reply)
74  replies.append(json.loads(reply, object_pairs_hook=DotDict))
75  return replies
76 
77  def simctl_setup(self, clients, arg):
78  if not isinstance(clients, list):
79  return self.__setup__setup([clients], arg)[0]
80  return self.__setup__setup(clients, arg)
81 
82  def simctl_reset(self, clients):
83  logger.info("Sending reset")
84  if not isinstance(clients, list):
85  return self.__reset__reset([clients])[0]
86  return self.__reset__reset(clients)
87 
88  def __reset(self, clients):
89  logger.debug("Sending reset")
90  replies = []
91  for client in clients:
92  replies.append(client.Reset())
93  logger.debug("Received replies")
94  return replies
95 
96  def simctl_force_exit(self, clients):
97  if not isinstance(clients, list):
98  self.__force_exit__force_exit([clients])[0]
99  self.__force_exit__force_exit(clients)
100 
101  def __force_exit(self, clients):
102  logger.debug("Sending ForceExit. A reply is not expected.")
103  for client in clients:
104  client.ForceExit()
Robot library providing keywords for the SimCtl interface.
Definition: SimCtl.py:27
def simctl_create_client(self, uri, timeout=None)
Creates a client with a given ReplyTime timeout.
Definition: SimCtl.py:59
def __force_exit(self, clients)
Definition: SimCtl.py:101
def simctl_reset(self, clients)
Definition: SimCtl.py:82
def __setup(self, clients, arg)
Definition: SimCtl.py:68
def __init__(self)
Definition: SimCtl.py:33
def simctl_force_exit(self, clients)
Definition: SimCtl.py:96
def simctl_setup(self, clients, arg)
Definition: SimCtl.py:77
def __reset(self, clients)
Definition: SimCtl.py:88
def simctl_create_clients(self, uris, timeout=None, replace_from=None, replace_to=None)
Creates a list of clients with a given ReplyTime timeout.
Definition: SimCtl.py:41
def get_rr_uri(arg)
Definition: SimCtl.py:12
def to_list(arg)
Definition: SimCtl.py:20