""" Implements :class:`Action` and :class:`ActionInThread` nodes.
Actions are the leaf nodes in any Sequencer script. They execute python methods or functions.
Coroutines are associated to :class:`Action` nodes while normal callable objects must be
associated to :class:`ActionInThread` nodes.
"""
import inspect
import logging
import functools
import asyncio
from datetime import datetime
import attr
import contextvars as cv
from zope.interface import implementer
from seq.lib.contextvars_executor import ContextVarExecutor
from .state import T_STATE
from .rtflags import RTFLAG
from .interface import INode, _BaseNode
from .. import ob
from ..counter import Counter
from .utils import uniqueId, set_runtime_flag, iscoroutine_or_partial, set_runtime_flag_from_dict
logger = logging.getLogger(__name__)
user_logger = logging.getLogger("seq.user")
[docs]@implementer(INode)
@attr.s
class Action(_BaseNode):
"""Action ctor.
Action nodes executes `coroutines` in a Sequencer script.
Args:
f(coroutine): The coroutine the node will execute
Keyword Args:
id (str): Unique id. If not provided an unique identifier is assigned.
name (str): Node name. If not provided a name is assigned.
Raises:
TypeError: is `f` is not a coroutine.
"""
f = attr.ib(default=None, repr=False)
parent_tpl = attr.ib(default=None, repr=False)
current_node = cv.ContextVar("current_node", default=None)
def __attrs_post_init__(self):
"""
setup object.
Sets name, id and allows node to run.
"""
self._check_function_type()
if self.name is None:
try:
self.name = self.f.__qualname__
except AttributeError:
self.name = id(self.f)
if self.id is None:
self.id = "%s_%s" % (self.name, uniqueId())
self.running_checkpoint.set() # running is allowed
desc = self.description or self.f.__doc__ or "No Description"
self.description = desc.strip().split("\n")[0]
def _check_function_type(self):
if iscoroutine_or_partial(self.f) is False:
raise TypeError("target function shall be coroutine")
@property
def context(self):
"""Get context from the running Sequence"""
from .sequence import Sequence
return Sequence.get_context()
def _publish_state(self):
"""Requests controller object to publish node's state."""
ctrl = ob.OB.controller.get()
# assert(ctrl)
ctrl and ctrl.notify_state_change(self)
@property
def state(self):
return super().state
@property
def full_state(self):
return super().full_state
@state.setter
def state(self, value):
"""Sets the node state"""
super(Action, self.__class__).state.fset(self, value)
self._publish_state()
[docs] def make_sequence(self, parent_tpl=None):
"""
Not much to do here.
Link back to Template object (if any)
Get runtime flags from controller object.
"""
from .template import Template
logger.debug("ACTION: %s, (Tpl: %s)", self.name, parent_tpl)
self.parent_tpl = parent_tpl
ctrl = ob.OB.controller.get()
if ctrl:
self.runtime_flags = ctrl.runtime_flags.get(self.serial_number, 0)
async def _execute_preamble(self):
from .sequence import Sequence
Action.current_node.set(self)
self.in_error=False
if self.runtime_flags & RTFLAG.PAUSE:
self.state = T_STATE.PAUSED
self.pause()
# check permission to execute
await self.running_checkpoint.wait()
# now we are running ...
if self.running_checkpoint.is_set():
self.state = T_STATE.RUNNING
self.t_start = datetime.now()
# Skip this node?
if self.runtime_flags & RTFLAG.SKIP:
self.state = T_STATE.FINISHED
self.skip = True
self.t_end = datetime.now()
self.result = None
async def _execute_node_task(self, task):
user_logger.info("Starting Task: %s", self.name)
try:
r = await task
self.result = task.result()
except asyncio.CancelledError:
raise
except Exception:
user_logger.info("node in error")
self.in_error = True
self.exception = task.exception()
raise # bubble up the exception
finally:
self.t_end = datetime.now()
self.state = T_STATE.FINISHED
user_logger.info("End Task: %s", self.name)
return r
async def __call__(self, resume=False):
"""
Executes node action.
If the action is a coroutine a task is created and passed to
the asyncio loop for execution.
"""
await self._execute_preamble()
if self.state is T_STATE.FINISHED: # node skipped in preamble
return None
if iscoroutine_or_partial(self.f):
task = asyncio.create_task(self.f())
else:
# this should never happen
raise TypeError("This don't belong here", self.f)
r = await self._execute_node_task(task)
return r
[docs]@attr.s
class ActionInThread(Action):
"""ActionInThread ctor.
ActionInThread nodes executes python `callables` (functions or methods) in a Sequencer script.
Args:
f(callable): The `callable` the node will execute
Keyword Args:
id (str): Unique id. If not provided an unique identifier is assigned.
name (str): Node name. If not provided a name is assigned.
Raises:
TypeError: is `f` is not a python method or function.
"""
_executor = ContextVarExecutor()
def _check_function_type(self):
assert callable(self.f)
if iscoroutine_or_partial(self.f) is True:
raise TypeError("target function shall be a normal callable")
async def __call__(self, resume=False):
"""
Executes node action.
The action is a normal function, a special task is created
in order to execute the action in a different thread.
In any case the context parameter is passed to the action and
its result saved.
"""
from .sequence import Sequence
await self._execute_preamble()
if self.state is T_STATE.FINISHED: # node skipped in preamble
return None
if iscoroutine_or_partial(self.f):
# This should never happen
raise TypeError("coroutines don't belong here")
loop = asyncio.get_running_loop()
task = loop.run_in_executor(ActionInThread._executor, self.f)
r = await self._execute_node_task(task)
return r
# @attr.s
[docs]class StartNode(Action):
"""This is Starting Node.
marks node as unskipabble
"""
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.can_skip = False
self.hide = True
[docs]class EndNode(Action):
"""This is the finish Node.
marks node as unskippable
"""
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
self.can_skip = False
self.hide = True
[docs]def make_node(s, **kw):
"""Makes an :class:`Action` nodes out of a coroutine.
Args:
s(coroutine): target coroutine to be passed to :class:`Action` ctor.
Keyword Arguments:
**kw: Passed straight to :class:`Action` ctor.
Returns:
:class:`Action` node associated to `s` coroutine.
Raises:
TypeError: if 's' is not a coroutine.
"""
node = None
if isinstance(s, _BaseNode):
return s
elif iscoroutine_or_partial(s):
node = Action(s, **kw)
return node
raise TypeError("Argument %s must be a node or coroutine type" % (s))