ifw-daq  3.0.0-pre2
IFW Data Acquisition modules
DpmCtl.py
Go to the documentation of this file.
1 import datetime
2 import time
3 from typing import List
4 
5 from robot.api import logger
6 from robot.utils import DotDict
7 from robot.utils import timestr_to_secs
8 
9 from elt import pymal
10 from ModDaqif.Daqif import (
11  DaqReply,
12  DaqStatus,
13  StorageStatus,
14  Exception,
15  DaqException,
16 )
17 
18 from ModDaqif.Daqif.DpmControl import DpmControlSync
19 from ModDaqif.Daqif.DpmDaqControl import DpmDaqControlSync
20 
21 
22 def get_rr_uri(arg):
23  if "@" in arg:
24  # Split name@rr-uri,ps-uri
25  uris = arg.split("@")[1]
26  return uris.split(",")[0]
27  return arg
28 
29 
30 def dict_from_daqreply(r: DaqReply):
31  return DotDict(id=r.getId(), error=r.getError())
32 
33 
34 def dict_from_daqstatus(r: DaqStatus):
35  # Note: To keep compatibility of C++, return only the stem from enum attr name
36  return DotDict(
37  id=r.getId(),
38  state=str(r.getState()).split(".")[1],
39  subState=str(r.getSubState()).split(".")[1],
40  error=r.getError(),
41  message=r.getMessage(),
42  timestamp=r.getTimestamp(),
43  )
44 
45 
46 def dict_from_storagestatus(r: StorageStatus):
47  return DotDict(
48  free=r.getFree(),
49  available=r.getAvailable(),
50  capacity=r.getCapacity(),
51  )
52 
53 
54 class Clients:
55  daq = None
56  dpm = None
57 
58 
59 class DpmCtl:
60  """Robot library providing keywords for the daqif.DpmDaqControl and daqif.DpmControl
61  interface"""
62 
63  # Global scope is necessary since MAL will shutdown protobuf library randomly
64  # otherwise, which eventually lead to segfaults. See ECII-304.
65  ROBOT_LIBRARY_SCOPE = "GLOBAL"
66 
67  def __init__(self):
68  logger.debug("Loading MAL and creating factory")
69  self.malmal = pymal.loadMal("zpb", {})
70  self.factoryfactory = pymal.CiiFactory.getInstance()
71  self.factoryfactory.registerMal("zpb", self.malmal)
72  logger.debug("Loading MAL and creating factory done")
73 
74  def dpmctl_create(self, root_uri, timeout=None):
75  """Creates RR clientss with a given ReplyTime timeout."""
76  timeout = timestr_to_secs(timeout) if timeout is not None else 5
77  root_uri = get_rr_uri(root_uri)
78  logger.info("Create client for uri %s with timeout %d" % (root_uri, timeout))
79  clients = Clients()
80  clients.daq = self.factoryfactory.getClient(
81  "%s/daq" % root_uri,
82  DpmDaqControlSync,
83  pymal.rr.qos.ReplyTime(datetime.timedelta(seconds=timeout)),
84  )
85  clients.dpm = self.factoryfactory.getClient(
86  "%s/dpm" % root_uri,
87  DpmControlSync,
88  pymal.rr.qos.ReplyTime(datetime.timedelta(seconds=timeout)),
89  )
90  return clients
91 
92  def dpmctl_exit(self, clients: Clients) -> str:
93  logger.info("Sending DpmControl.Exit(%s) request")
94  clients.dpm.Exit()
95 
96  def dpmctl_query_storage_status(self, clients: Clients) -> StorageStatus:
97  logger.info("Sending DpmDaqControl.QueryStorageStatus() request")
98  r = clients.dpm.QueryStorageStatus()
99  return dict_from_storagestatus(r)
100 
101  def dpmctl_queue_daq(self, clients: Clients, dp_spec: str) -> DaqReply:
102  logger.info(
103  "Sending DpmDaqControl.QueueDaq(%s) request",
104  dp_spec[0:10].replace("\n", " "),
105  )
106  try:
107  r = clients.daq.QueueDaq(dp_spec)
108  return dict_from_daqreply(r)
109  except DaqException as e:
110  logger.info("QueueDaq() failed: %s" % e.getMessage())
111  raise
112 
113  def dpmctl_abort_daq(self, clients: Clients, daq_id: str) -> DaqReply:
114  logger.info("Sending DpmDaqControl.AbortDaq(%s) request", daq_id)
115  r = clients.daq.AbortDaq(daq_id)
116  return dict_from_daqreply(r)
117 
118  def dpmctl_get_daq_status(self, clients: Clients, daq_id: str) -> DaqStatus:
119  logger.info("Sending DpmDaqControl.GetStatus(%s) request", daq_id)
120  r = clients.daq.GetDaqStatus(daq_id)
121  return dict_from_daqstatus(r)
122 
123  def dpmctl_get_active_daqs(self, clients: Clients) -> List[DaqStatus]:
124  logger.info("Sending DpmDaqControl.GetActive() request")
125  daqs = clients.daq.GetActiveDaqs()
126  return [dict_from_daqstatus(daq) for daq in daqs]
127 
128  def dpmctl_await_error(self, clients: Clients, daq_id: str, timeout: str = None) -> DaqStatus:
129  logger.info("Sending DpmDaqControl.GetDaqStatus() until timeout or erro")
130  timeout = timestr_to_secs(timeout) if timeout is not None else 3
131  until = time.monotonic() + timeout
132  while True:
133  if time.monotonic() > until:
134  raise TimeoutError("Timeout waiting for DAQ error")
135  r = clients.daq.GetDaqStatus(daq_id)
136  if r.getError():
137  return dict_from_daqstatus(r)
138  time.sleep(0.25)
Robot library providing keywords for the daqif.DpmDaqControl and daqif.DpmControl interface.
Definition: DpmCtl.py:61
def dpmctl_create(self, root_uri, timeout=None)
Creates RR clientss with a given ReplyTime timeout.
Definition: DpmCtl.py:75
DaqReply dpmctl_queue_daq(self, Clients clients, str dp_spec)
Definition: DpmCtl.py:101
StorageStatus dpmctl_query_storage_status(self, Clients clients)
Definition: DpmCtl.py:96
def __init__(self)
Definition: DpmCtl.py:67
str dpmctl_exit(self, Clients clients)
Definition: DpmCtl.py:92
List[DaqStatus] dpmctl_get_active_daqs(self, Clients clients)
Definition: DpmCtl.py:123
DaqStatus dpmctl_await_error(self, Clients clients, str daq_id, str timeout=None)
Definition: DpmCtl.py:128
DaqReply dpmctl_abort_daq(self, Clients clients, str daq_id)
Definition: DpmCtl.py:113
DaqStatus dpmctl_get_daq_status(self, Clients clients, str daq_id)
Definition: DpmCtl.py:118
def dict_from_storagestatus(StorageStatus r)
Definition: DpmCtl.py:46
def dict_from_daqreply(DaqReply r)
Definition: DpmCtl.py:30
def get_rr_uri(arg)
Definition: DpmCtl.py:22
def dict_from_daqstatus(DaqStatus r)
Definition: DpmCtl.py:34