""" Implements Action and ActionInThread nodes.
Actions are the leaf nodes in any Sequencer script. They execute python methods or functions.
Coroutines are associated to :class:`Action` nodes while normal callable objects must be
associated to :class:`ActionInThread` nodes.
"""
import inspect
import functools
import random
import asyncio
import attr
import logging
from datetime import datetime
from hashids import Hashids
from zope.interface import Interface, Attribute, implementer
from seqlib.contextvars_executor import ContextVarExecutor
from .state import T_STATE
from .interface import INode, _BaseNode
logger = logging.getLogger(__name__)
import contextvars as cv
[docs]@implementer(INode)
@attr.s
class Action(_BaseNode):
""" Action ctor.
Action nodes executes `coroutines` in a Sequencer script.
Args:
f(coroutine): The coroutine the node will execute
Keyword Args:
id (str): Unique id. If not provided an unique identifier is assigned.
name (str): Node name. If not provided a name is assigned.
Raises:
TypeException: is `f` is not a coroutine.
"""
f = attr.ib(default=None, repr=False)
def __attrs_post_init__(self):
"""
setup object.
Sets name, id and allows node to run.
"""
# validate 'f'
self._check_function_type()
if self.name is None:
try:
self.name = self.f.__qualname__
except AttributeError:
self.name = id(self.f)
if self.id is None:
self.id = "%s_%s" % (self.name, uniqueId())
self.running_checkpoint.set() # running is allowed
def _check_function_type(self):
assert callable(self.f)
if iscoroutine_or_partial(self.f) is False:
raise TypeError("target function shall be coroutine")
@property
def context(self):
""" Get context from the running Sequence
"""
return Sequence.get_context()
async def _execute_preamble(self):
self.state = T_STATE.PAUSED
# check permission to execute
await self.running_checkpoint.wait()
# now we are running ...
if self.running_checkpoint.is_set():
self.state = T_STATE.RUNNING
self.t_start = datetime.now()
r = None
# Skip this node?
if self.skip:
self.state = T_STATE.FINISHED
self.t_end = datetime.now()
self.result = None
async def _execute_node_task(self, task):
try:
r = await task
self.result = task.result()
except Exception:
self.in_error = True
self.exception = task.exception()
raise # bubble up the exception
finally:
self.t_end = datetime.now()
self.state = T_STATE.FINISHED
return r
async def __call__(self, resume=False):
"""
Executes node action.
If the action is a coroutine a task is created and passed to
the asyncio loop for execution.
"""
await self._execute_preamble()
if self.state is T_STATE.FINISHED: # node skipped in preamble
return None
if iscoroutine_or_partial(self.f):
task = asyncio.create_task(self.f())
else:
# this should never happen
raise TypeError("This don't belong here", self.f)
r = await self._execute_node_task(task)
return r
[docs]@attr.s
class ActionInThread(Action):
""" ActionInThread ctor.
ActionInThread nodes executes python `callables` (functions or methods) in a Sequencer script.
Args:
f(callable): The `callable` the node will execute
Keyword Args:
id (str): Unique id. If not provided an unique identifier is assigned.
name (str): Node name. If not provided a name is assigned.
Raises:
TypeException: is `f` is not a python method or function.
"""
_executor = ContextVarExecutor()
def _check_function_type(self):
assert callable(self.f)
if iscoroutine_or_partial(self.f) is True:
raise TypeError("target function shall be a normal callable")
async def __call__(self, resume=False):
"""
Executes node action.
The action is a normal function, a special task is created
in order to execute the action in a different thread.
In any case the context parameter is passed to the action and
its result saved.
"""
await self._execute_preamble()
if self.state is T_STATE.FINISHED: # node skipped in preamble
return None
if iscoroutine_or_partial(self.f):
# This should never happen
raise TypeError("coroutines don't belong here")
# task = asyncio.create_task(self.f())
else:
logger.debug("run in thread: %s", self.f)
loop = asyncio.get_running_loop()
task = loop.run_in_executor(ActionInThread._executor, self.f)
r = await self._execute_node_task(task)
return r
@attr.s
class StartNode(Action):
"""This is Starting Node.
Does nothing special, but who knows in the future."""
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
class EndNode(Action):
"""This is the finish Node.
Does nothing special, but who knows in the future."""
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
def uniqueId(value=None):
"""Returns an unique identifier number
"""
if value is None:
value = random.randrange(1e6)
h = Hashids().encode(value)
return h
def make_node(s, **kw):
"""Makes an :class:`Action` nodes out of a coroutines.
Args:
s(coroutine): target coroutine to be passed to :class:`Action` ctor.
Keyword Arguments:
**kw: Passed straight to :class:`Action` ctor.
Returns:
:class:`Action` node associated to `s` coroutine.
"""
if isinstance(s, _BaseNode):
return s
if iscoroutine_or_partial(s):
return Action(s, **kw)
raise TypeError("Argument must be a node or coroutine type")
def iscoroutine_or_partial(obj):
"""Support function.
Identifies coroutines wrapped with partial
"""
while isinstance(obj, functools.partial):
obj = obj.func
return inspect.iscoroutinefunction(obj)