Source code for seqlib.nodes.loop

# loop.py
"""Implements Loops for the sequencer.

A node Loop is composed of three parts.

    * *Initialization* (``optional``) initiates variables to be used
       in the loop's activity and its exit-condition; it is executed
       once on entry to the loop

    * *condition* (``callable``) Check the loop's condition.
        It shall return *False* to interrupt the Loop. Otherwise it
        shall *return* **True**.

    * *block* (``Sequence``) The loop's body. It is the sequence of
       nodes to be executed repeatedly until the Loop's condition
       produces a False value.

"""
import logging
import contextvars as cv
import attr
import networkx as nx
from .action import make_node
from .sequence import Sequence
from .state import T_STATE

logger = logging.getLogger(__name__)


[docs]@attr.s class Loop(Sequence): """Loop node definition. Use the :meth:`create` method to build properly :class:`Loop` node. Since it inherits from :class:`Sequence` it has access to the same context variables. ============ ======================================================= Context Variables --------------------------------------------------------------------- Name Desc ============ ======================================================= current_tpl The parent of the current node (from :class:`Sequence`) root Top level DAG's root (from :class:`Sequence`) index The Loop's current index (starts at 0) ============ ======================================================= Keyword Args: id (Optional str): Node id. name (Optional str): Node name. init (Optional): Initialization node. condition (callale): A Python method that returns a boolean value. block (list): The loop's body. """ # my attributes block_args = attr.ib(kw_only=True, default=attr.Factory(list), repr=False) condition = attr.ib(kw_only=True, default=None, repr=False) init = attr.ib(kw_only=True, default=None, repr=False) block = attr.ib(init=False, default=None, repr=False) # Loop's contextvar index = cv.ContextVar("index", default=0) def __attrs_post_init__(self): if self.name is None: self.name = "Loop" super().__attrs_post_init__() self.set_block(*self.block_args) logger.debug("LOOP block: %s", self.block) if self.condition is None: self.condition = self._condition self.condition = make_node(self.condition) if self.init is None: self.init = self._init_loop self.init = make_node(self.init) async def end_step(self): """Standard Loop's end step. Evaluates the Loop's final state. """ logger.info("this is the <end> of the sequence: %s", self.name) logger.debug("collect states") self.state = T_STATE.FINISHED G = self.graph states = [ G.nodes[key]["node"].in_error for key in nx.topological_sort(G) ] # grab node's state and check for ERROR if any(states): self.in_error = True def make_sequence(self): """Creates the Loop execution graph""" G = self.graph st = self.start_node end = self.end_node G.add_node(st.id, node=st) G.add_node(end.id, node=end) # G.add_edge(st.id, self.init.id) # G.add_node(self.init.id, node=self.init, label="Init") G.add_node(self.condition.id, node=self.condition, label="Cond") if self.block: G.add_node(self.block.id, node=self.block, label="Loop body") # # makes loop graph # G.add_edge(st.id, self.condition.id) # G.add_edge(self.condition.id, self.block.id) # # G.add_edge(self.init.id, self.condition.id) # G.add_edge(self.condition.id, end.id) def set_block(self, *args): """Assigns the sequence to the loop's block""" if args: self.block = Sequence.create( *args, name="block", id="block_" + self.id ) self.block.make_sequence() def make_task(self, node, input_list, resume): from ..seqtask import LoopTask return LoopTask(self.id, node, input_list, resume=resume) async def _condition(self): return False async def _init_loop(self): pass def nodes(self): return [self.condition.id, self.block.id] # return [self.block.id] # return self.block.G.nodes def get_node(self, node_id): # return self.block.get_node(node_id) if node_id == self.condition.id: return self.condition elif node_id == self.block.id: return self.block else: return self.block.get_node(node_id)
[docs] @staticmethod def create(*args, **kw): """Creates a :class:`Loop` node Args: *args: Variable length list of nodes or coroutines that comprises the Loop`s body. Keyword Args: id: Node id name: node name init (node) : initialization node :class:`Action` or :class:`ActionInThread`. condition(node): condition node :class:`Action` or :class:`ActionInThread`. Returns: A new :class:`Loop` object Example: Creating a loop. .. code-block:: python def eval_condition(): return False class Tpl: def initialize(self, context): # performs some initialization pass async def a(): pass async def b(): pass @staticmethod def create() t = MyClass() l = Loop.create(t.a, t.b, condition=eval_condition, init=t.initialize) """ a = Loop(block_args=args, **kw) return a