ifw-daq  1.0.0
IFW Data Acquisition modules
OcmCtl.py
Go to the documentation of this file.
1 import datetime
2 
3 from robot.api import logger
4 from robot.utils import DotDict
5 from robot.utils import timestr_to_secs
6 
7 from elt import pymal
8 from ModOcmif.Ocmif import (
9  DaqReply,
10  DaqStatus,
11 )
12 from ModOcmif.Ocmif.OcmDaq import OcmDaqSync
13 from ModStdif.Stdif.StdCmds import StdCmdsSync
14 
15 
16 def get_rr_uri(arg):
17  if "@" in arg:
18  # Split name@rr-uri,ps-uri
19  uris = arg.split("@")[1]
20  return uris.split(",")[0]
21  return arg
22 
23 
24 def dict_from_daqreply(r: DaqReply):
25  return DotDict(id=r.getId(), error=r.getError())
26 
27 
28 def dict_from_daqstatus(r: DaqStatus):
29  # Note: To keep compatibility of C++, return only the stem from enum attr name
30  return DotDict(
31  id=r.getId(),
32  state=str(r.getState()).split(".")[1],
33  subState=str(r.getSubState()).split(".")[1],
34  error=r.getError(),
35  message=r.getMessage(),
36  timestamp=r.getTimestamp(),
37  )
38 
39 
40 class Clients:
41  daq = None
42  std = None
43 
44 
45 class OcmCtl:
46  """Robot library providing keywords for the ocmif.OcmDaq and stdif interface"""
47 
48  # Global scope is necessary since MAL will shutdown protobuf library randomly
49  # otherwise, which eventually lead to segfaults. See ECII-304.
50  ROBOT_LIBRARY_SCOPE = "GLOBAL"
51 
52  def __init__(self):
53  logger.debug("Loading MAL and creating factory")
54  self.mal = pymal.loadMal("zpb", {})
55  self.factory = pymal.CiiFactory.getInstance()
56  self.factory.registerMal("zpb", self.mal)
57  logger.debug("Loading MAL and creating factory done")
58 
59  def ocmctl_create(self, root_uri, timeout=None):
60  """Creates RR clientss with a given ReplyTime timeout."""
61  timeout = timestr_to_secs(timeout) if timeout is not None else 3
62  root_uri = get_rr_uri(root_uri)
63  logger.info("Create client for uri %s with timeout %d" % (root_uri, timeout))
64  clients = Clients()
65  clients.daq = self.factory.getClient(
66  "%s/daq" % root_uri,
67  OcmDaqSync,
68  pymal.rr.qos.ReplyTime(datetime.timedelta(seconds=timeout)),
69  )
70  clients.std = self.factory.getClient(
71  "%s/std" % root_uri,
72  StdCmdsSync,
73  pymal.rr.qos.ReplyTime(datetime.timedelta(seconds=timeout)),
74  )
75  return clients
76 
77  def ocmctl_daq_get_status(self, clients: Clients, daq_id: str):
78  logger.info("Sending OcmDaq.GetStatus(%s) request", daq_id)
79  r = clients.daq.GetStatus(daq_id)
80  return dict_from_daqstatus(r)
81 
82  def ocmctl_std_get_state(self, clients: Clients):
83  logger.info("Sending StdCmds.GetState() request")
84  return clients.std.GetState()
85 
86  def ocmctl_daq_get_active_list(self, clients: Clients):
87  logger.info("Sending OcmDaq.GetActive() request")
88  daqs = clients.daq.GetActiveList()
89  return [dict_from_daqstatus(daq) for daq in daqs]
OcmCtl.OcmCtl.factory
factory
Definition: OcmCtl.py:55
OcmCtl.OcmCtl.ocmctl_daq_get_active_list
def ocmctl_daq_get_active_list(self, Clients clients)
Definition: OcmCtl.py:86
OcmCtl.dict_from_daqstatus
def dict_from_daqstatus(DaqStatus r)
Definition: OcmCtl.py:28
OcmCtl.OcmCtl.ocmctl_daq_get_status
def ocmctl_daq_get_status(self, Clients clients, str daq_id)
Definition: OcmCtl.py:77
OcmCtl.OcmCtl.ocmctl_create
def ocmctl_create(self, root_uri, timeout=None)
Creates RR clientss with a given ReplyTime timeout.
Definition: OcmCtl.py:60
OcmCtl.OcmCtl
Robot library providing keywords for the ocmif.OcmDaq and stdif interface.
Definition: OcmCtl.py:46
OcmCtl.get_rr_uri
def get_rr_uri(arg)
Definition: OcmCtl.py:16
OcmCtl.Clients
Definition: OcmCtl.py:40
OcmCtl.OcmCtl.__init__
def __init__(self)
Definition: OcmCtl.py:52
OcmCtl.OcmCtl.mal
mal
Definition: OcmCtl.py:54
OcmCtl.dict_from_daqreply
def dict_from_daqreply(DaqReply r)
Definition: OcmCtl.py:24
OcmCtl.OcmCtl.ocmctl_std_get_state
def ocmctl_std_get_state(self, Clients clients)
Definition: OcmCtl.py:82