Source code for seqlib.nodes.action

""" Implements Action and ActionInThread nodes.

    Actions are the leaf nodes in any Sequencer script. They execute python methods or functions.

    Coroutines are associated to :class:`Action` nodes while normal callable objects must be
    associated to :class:`ActionInThread` nodes.

"""
import inspect
import functools
import random
import asyncio
import attr
import logging
from datetime import datetime
from hashids import Hashids
from zope.interface import Interface, Attribute, implementer
from seqlib.contextvars_executor import ContextVarExecutor
from .state import T_STATE
from .interface import INode, _BaseNode

logger = logging.getLogger(__name__)
import contextvars as cv


[docs]@implementer(INode) @attr.s class Action(_BaseNode): """ Action ctor. Action nodes executes `coroutines` in a Sequencer script. Args: f(coroutine): The coroutine the node will execute Keyword Args: id (str): Unique id. If not provided an unique identifier is assigned. name (str): Node name. If not provided a name is assigned. Raises: TypeException: is `f` is not a coroutine. """ f = attr.ib(default=None, repr=False) def __attrs_post_init__(self): """ setup object. Sets name, id and allows node to run. """ # validate 'f' self._check_function_type() if self.name is None: try: self.name = self.f.__qualname__ except AttributeError: self.name = id(self.f) if self.id is None: self.id = "%s_%s" % (self.name, uniqueId()) self.running_checkpoint.set() # running is allowed def _check_function_type(self): assert callable(self.f) if iscoroutine_or_partial(self.f) is False: raise TypeError("target function shall be coroutine") @property def context(self): """ Get context from the running Sequence """ return Sequence.get_context() async def _execute_preamble(self): self.state = T_STATE.PAUSED # check permission to execute await self.running_checkpoint.wait() # now we are running ... if self.running_checkpoint.is_set(): self.state = T_STATE.RUNNING self.t_start = datetime.now() r = None # Skip this node? if self.skip: self.state = T_STATE.FINISHED self.t_end = datetime.now() self.result = None async def _execute_node_task(self, task): try: r = await task self.result = task.result() except Exception: self.in_error = True self.exception = task.exception() raise # bubble up the exception finally: self.t_end = datetime.now() self.state = T_STATE.FINISHED return r async def __call__(self, resume=False): """ Executes node action. If the action is a coroutine a task is created and passed to the asyncio loop for execution. """ await self._execute_preamble() if self.state is T_STATE.FINISHED: # node skipped in preamble return None if iscoroutine_or_partial(self.f): task = asyncio.create_task(self.f()) else: # this should never happen raise TypeError("This don't belong here", self.f) r = await self._execute_node_task(task) return r
[docs]@attr.s class ActionInThread(Action): """ ActionInThread ctor. ActionInThread nodes executes python `callables` (functions or methods) in a Sequencer script. Args: f(callable): The `callable` the node will execute Keyword Args: id (str): Unique id. If not provided an unique identifier is assigned. name (str): Node name. If not provided a name is assigned. Raises: TypeException: is `f` is not a python method or function. """ _executor = ContextVarExecutor() def _check_function_type(self): assert callable(self.f) if iscoroutine_or_partial(self.f) is True: raise TypeError("target function shall be a normal callable") async def __call__(self, resume=False): """ Executes node action. The action is a normal function, a special task is created in order to execute the action in a different thread. In any case the context parameter is passed to the action and its result saved. """ await self._execute_preamble() if self.state is T_STATE.FINISHED: # node skipped in preamble return None if iscoroutine_or_partial(self.f): # This should never happen raise TypeError("coroutines don't belong here") # task = asyncio.create_task(self.f()) else: logger.debug("run in thread: %s", self.f) loop = asyncio.get_running_loop() task = loop.run_in_executor(ActionInThread._executor, self.f) r = await self._execute_node_task(task) return r
@attr.s class StartNode(Action): """This is Starting Node. Does nothing special, but who knows in the future.""" def __init__(self, *args, **kw): super().__init__(*args, **kw) class EndNode(Action): """This is the finish Node. Does nothing special, but who knows in the future.""" def __init__(self, *args, **kw): super().__init__(*args, **kw) def uniqueId(value=None): """Returns an unique identifier number """ if value is None: value = random.randrange(1e6) h = Hashids().encode(value) return h def make_node(s, **kw): """Makes an :class:`Action` nodes out of a coroutines. Args: s(coroutine): target coroutine to be passed to :class:`Action` ctor. Keyword Arguments: **kw: Passed straight to :class:`Action` ctor. Returns: :class:`Action` node associated to `s` coroutine. """ if isinstance(s, _BaseNode): return s if iscoroutine_or_partial(s): return Action(s, **kw) raise TypeError("Argument must be a node or coroutine type") def iscoroutine_or_partial(obj): """Support function. Identifies coroutines wrapped with partial """ while isinstance(obj, functools.partial): obj = obj.func return inspect.iscoroutinefunction(obj)