Source code for seq.lib.nodes.action

""" Implements :class:`Action` and :class:`ActionInThread` nodes.

    Actions are the leaf nodes in any Sequencer script. They execute python methods or functions.

    Coroutines are associated to :class:`Action` nodes while normal callable objects must be
    associated to :class:`ActionInThread` nodes.

"""
import inspect
import logging
import functools
import asyncio
from datetime import datetime
import attr

import contextvars as cv
from zope.interface import implementer
from seq.lib.contextvars_executor import ContextVarExecutor
from .state import T_STATE
from .rtflags import RTFLAG
from .interface import INode, _BaseNode
from .. import ob
from ..counter import Counter
from .utils import uniqueId, set_runtime_flag, iscoroutine_or_partial, set_runtime_flag_from_dict


logger = logging.getLogger(__name__)
user_logger = logging.getLogger("seq.user")


[docs]@implementer(INode) @attr.s class Action(_BaseNode): """Action ctor. Action nodes executes `coroutines` in a Sequencer script. Args: f(coroutine): The coroutine the node will execute Keyword Args: id (str): Unique id. If not provided an unique identifier is assigned. name (str): Node name. If not provided a name is assigned. Raises: TypeError: is `f` is not a coroutine. """ f = attr.ib(default=None, repr=False) parent_tpl = attr.ib(default=None, repr=False) current_node = cv.ContextVar("current_node", default=None) def __attrs_post_init__(self): """ setup object. Sets name, id and allows node to run. """ self._check_function_type() if self.name is None: try: self.name = self.f.__qualname__ except AttributeError: self.name = id(self.f) if self.id is None: self.id = "%s_%s" % (self.name, uniqueId()) self.running_checkpoint.set() # running is allowed desc = self.description or self.f.__doc__ or "No Description" self.description = desc.strip().split("\n")[0] def _check_function_type(self): if iscoroutine_or_partial(self.f) is False: raise TypeError("target function shall be coroutine") @property def context(self): """Get context from the running Sequence""" from .sequence import Sequence return Sequence.get_context() def _publish_state(self): """Requests controller object to publish node's state.""" ctrl = ob.OB.controller.get() # assert(ctrl) ctrl and ctrl.notify_state_change(self) @property def state(self): return super().state @property def full_state(self): return super().full_state @state.setter def state(self, value): """Sets the node state""" super(Action, self.__class__).state.fset(self, value) self._publish_state()
[docs] def make_sequence(self, parent_tpl=None): """ Not much to do here. Link back to Template object (if any) Get runtime flags from controller object. """ from .template import Template logger.debug("ACTION: %s, (Tpl: %s)", self.name, parent_tpl) self.parent_tpl = parent_tpl ctrl = ob.OB.controller.get() if ctrl: self.runtime_flags = ctrl.runtime_flags.get(self.serial_number, 0)
async def _execute_preamble(self): from .sequence import Sequence Action.current_node.set(self) self.in_error=False if self.runtime_flags & RTFLAG.PAUSE: self.state = T_STATE.PAUSED self.pause() # check permission to execute await self.running_checkpoint.wait() # now we are running ... if self.running_checkpoint.is_set(): self.state = T_STATE.RUNNING self.t_start = datetime.now() # Skip this node? if self.runtime_flags & RTFLAG.SKIP: self.state = T_STATE.FINISHED self.skip = True self.t_end = datetime.now() self.result = None async def _execute_node_task(self, task): user_logger.info("Starting Task: %s", self.name) try: r = await task self.result = task.result() except asyncio.CancelledError: raise except Exception: user_logger.info("node in error") self.in_error = True self.exception = task.exception() raise # bubble up the exception finally: self.t_end = datetime.now() self.state = T_STATE.FINISHED user_logger.info("End Task: %s", self.name) return r async def __call__(self, resume=False): """ Executes node action. If the action is a coroutine a task is created and passed to the asyncio loop for execution. """ await self._execute_preamble() if self.state is T_STATE.FINISHED: # node skipped in preamble return None if iscoroutine_or_partial(self.f): task = asyncio.create_task(self.f()) else: # this should never happen raise TypeError("This don't belong here", self.f) r = await self._execute_node_task(task) return r
[docs]@attr.s class ActionInThread(Action): """ActionInThread ctor. ActionInThread nodes executes python `callables` (functions or methods) in a Sequencer script. Args: f(callable): The `callable` the node will execute Keyword Args: id (str): Unique id. If not provided an unique identifier is assigned. name (str): Node name. If not provided a name is assigned. Raises: TypeError: is `f` is not a python method or function. """ _executor = ContextVarExecutor() def _check_function_type(self): assert callable(self.f) if iscoroutine_or_partial(self.f) is True: raise TypeError("target function shall be a normal callable") async def __call__(self, resume=False): """ Executes node action. The action is a normal function, a special task is created in order to execute the action in a different thread. In any case the context parameter is passed to the action and its result saved. """ from .sequence import Sequence await self._execute_preamble() if self.state is T_STATE.FINISHED: # node skipped in preamble return None if iscoroutine_or_partial(self.f): # This should never happen raise TypeError("coroutines don't belong here") loop = asyncio.get_running_loop() task = loop.run_in_executor(ActionInThread._executor, self.f) r = await self._execute_node_task(task) return r
# @attr.s
[docs]class StartNode(Action): """This is Starting Node. marks node as unskipabble """ def __init__(self, *args, **kw): super().__init__(*args, **kw) self.can_skip = False self.hide = True
[docs]class EndNode(Action): """This is the finish Node. marks node as unskippable """ def __init__(self, *args, **kw): super().__init__(*args, **kw) self.can_skip = False self.hide = True
[docs]def make_node(s, **kw): """Makes an :class:`Action` nodes out of a coroutine. Args: s(coroutine): target coroutine to be passed to :class:`Action` ctor. Keyword Arguments: **kw: Passed straight to :class:`Action` ctor. Returns: :class:`Action` node associated to `s` coroutine. Raises: TypeError: if 's' is not a coroutine. """ node = None if isinstance(s, _BaseNode): return s elif iscoroutine_or_partial(s): node = Action(s, **kw) return node raise TypeError("Argument %s must be a node or coroutine type" % (s))