A Deeper Look

A deeper look to nodes and its attributes is given in the following paragraphs.

Passing Arguments to Actions

Action and ActionInThread constructors only admits a function object, no space to pass parameters to the function or coroutine that is going to be executed by the node.

However, functions are first class citizens in python, this makes easy to create them and pass around dynamically. It is recommended to use partial functions in order to pass parameters to the functions associated to action nodes.

Using partial functions

The use of partial functions allows to fix a certain number of arguments of a function and generate a new function, on the spot. Please see functools.partial() for the official documentation. In any case the following example illustrates its use.

We recommend the use seq.lib.partial() instead of functools.partial() since the former will keep the documentation of the wrapped function.

partial function example
from seq.lib import partial

def f(a,b,c):
  return a+b+c

g = partial(f,1,2)  # creates new function g()
g(3) # Equivalent to f(1,2,3)
g(9)
g(1,2) # f will complain too many arguments were passed

The example partial function example shows a new function g() created by using partial in order to specify that g() when, called will invoke f(1,2) plus any extra argument given.

From the point of view of the python interpreter g() is a normal function, meaning you can use g() as many times as you see fit.

Runtime Flags

It is possible to associate runtime flags to the nodes. In order to skip or pause them. The easiest way to do this is by using the _pause and _skip shortcuts, as follows:

from seq.lib import _pause, _skip, Sequence

 async def a():
     pass

 def b():
     pass

 node1 = _pause(a);
 node2 = _skip(b);

 s = Sequence.create(node1, node2, _skip(a))

Another way is to use set_runtime_flags(node, flags) function which requires a node and sets its runtime flags from the parameter. Valid runtime flags values are PAUSE and SKIP:

from seq.lib import RTFLAG, set_runtime_flag

node = Action(f)
set_runtime_flag(node, RTFLAG.PAUSE)
...
set_runtime_flag(node, RTFLAG.SKIP, False)

Summary building DAGs

Some more points to note from the basic examples shown before.

Constructor Calling conventions

First of all, except for Action and ActionInThread do not use the standard class_name() constructor to build nodes. i.e. never use naked Parallel() to create a Parallel node, same for the other node types.

Instead, use the create() method every container node implements (Loop, Sequence, Parallel).

Important

Use the create() method to instance all container classes (Loop, Sequence, Parallel, etc).

Since they hold a variable number of children nodes, their constructors (create() method) receives its children as positional arguments. Any other attribute (mandatory or not) is passed trough keyword arguments. See Node constructor calling convention

Node constructor calling convention
# Sequence ctor
Sequence.create(child_1, child_2, ..., child_n,
                id="my_unique_id", name="my nice name")

# Parallel ctor
Parallel.create(child_1, child_2, ..., child_n,
                id="my_unique_id", name="my nice name")

# Loop ctor
Loop.create(child_1, child_2, ..., child_n,
                id="my_unique_id", name="my nice name",
                init = init_function,
                condition= condition_function
                )

Node Attributes

Node attributes are passed as keyword arguments in ts create method. Every node has, at least, a name and id attributes. If they are not specified while building the DAG they are assigned by the engine pseudo–randomly.

The specific attributes for each node type are detailed in the following table:

Node Class

Attribute

Description

ALL

id

Node id (must be unique)

ALL

name

Node name

Loop

init

Initialization node

Loop

condition

Loop’s condition node

Special Variables

Sequences uses contextvars.ContextVar to distribute special values around all tasks and threads that are part of a given Sequencer script.

The ContextVar module implements context variables. This concept is similar to thread-local storage (TLS), but, unlike TLS, it also allows correctly keeping track of values per asynchronous task, e.g. asyncio.Task.

Class

Variable

Description

Sequence

current_seq

Current Sequence name

Sequence

root

The root node

Loop

index

Loop’s running index

Result Handling

Each node has a result attribute where the return value from its associated step is stored.

For Action and ActionInThread, the result is just the return value from its associated function. Since each Action is just a simple function or method, their result is kept in its corresponding node’s attribute result.

All other nodes will have an empty result unless it is explicitely set by some step, in the sequence. Example Set node result shows a step setting the result of the sequence that contains it. The node that contains a given action is accessed trough the Sequence.current_seq context variable as the example shows.

It is clear that in order to check for a node’s result one needs to have a handler to that node or a way to find it, see Finding nodes.

Note

Both Parallel and Loop classes inherit from Sequence. Therefore they can access Sequence.current_seq context variable.

Set node result
  from seqlib.ob import OB
  # reuse some sequence ...
  from . import test_a as a

  async def mystep():
    """Sets current Sequence's result"""
    s = Sequence.current_seq.get()
    s.result = 0

  async def test_result():
      tpa = OB.create_sequence(a)
      sc = Parallel.create(tpa, mystep)
      await sc.start()
      assert sc.result == 0

Finding nodes

Unless one has saved a reference to a node, e.g. as a class member or global variable. The only way to find a node in the DAG is trough its unique id.

The func:seq.lib.nodes.find_node receives as a parameter a starting node and the id of the node being looked up. On success it returns a tuple the target node and its parent as the tuple (parent, node).

  1. In order to lookup a node from the DAG’s root (meaning look around the complete sequence until a hit is found), simply pass the root context variable as the initial node.

  2. In order to lookup a node from the current sequence use the current_seq context variable as the starting node. Lookup a node illustrates both use cases.

Lookup a node
from seqlib.nodes import Sequence, find_node
...
async def do_a():
  # find node `id1` (not shown) starting at root and get its result.
  _, node = find_node(Sequence.root.get(), "id1")
  print("the other node result", node.result)
  return node.result + 1; # or something

async def do_b():
  # find do_a's result
  _, node_a = find_node(Sequence.current_seq.get(), "id_do_a")
  return node_a.result +1; # or do something else

# Given a Sequence s
s = Sequence.create(Action(do_a, id="id_do_a"), do_b)

Since node_ids are unique inside a given Sequence there is no risk of loosing an Action’s result because it gets overwritten by some other node. As opposed to Nodes have context.

Nodes have context

Besides the result attribute that can be inspected in order to pass information between Sequencer scripts. There is the context dictionary which can be freely accesses throughout all nodes being executed.

The context dictionary is a property shared among all Sequence nodes (includes Loop and Parallel). Action and ActionInThread nodes can gain access to it trough Sequence.get_context() static method. Plese see the following code excerpts Node context example.

The methods do_a() and do_b() must access the context trough the Sequence.get_context() static method. The object tpl, being a Sequence instance can access its Sequence.context attribute.

Node context example
  async def do_a():
      ctx = Sequence.get_context()
      ctx["do_a"] = 1

  def do_b():
      ctx = Sequence.get_context()
      ctx["do_b"] = 1

  # creates the sequence
  tpl = Sequence.create(do_a, ActionInThread(do_b))
  # Sequence.root.set(tpl)
  await tpl.start()
  ctx = tpl.context

  assert tpl.context["do_a"] == 1
  assert tpl.context["do_b"] == 1

Warning

Notice there are no hard rules about what can go into the context dictionary and under what key. It might be simpler to use than setting results on nodes but there is no guarantee a given key might be overwritten in some other part of the running script just because of a name clash.

Node Types

The sequencer node types lives in the module seq.nodes:

Action

The simple action node. It contains a python function or method to be executed.

# Creates a node with some properties.
node_a = Node(t.a, name="node")

Sequence

Executes nodes one after the other.

strict digraph "" {
	graph [bb="0,0,297.59,66",
		label="seq.samples.a",
		lheight=0.19,
		lp="148.79,11",
		lwidth=1.64,
		rankdir=LR
	];
	node [label="\N"];
	start_Tut_01_7K99X2B	[color=black,
		height=0.5,
		label="",
		pos="18,44",
		shape=circle,
		style=filled,
		width=0.5];
	A_oAJLK	[height=0.5,
		label=a,
		pos="99,44",
		width=0.75];
	start_Tut_01_7K99X2B -> A_oAJLK	[pos="e,71.874,44 36.142,44 43.644,44 52.75,44 61.642,44"];
	do_b_Bg3N	[height=0.5,
		label=b,
		pos="189.79,44",
		width=0.77205];
	A_oAJLK -> do_b_Bg3N	[pos="e,161.9,44 126.16,44 134.16,44 143.12,44 151.7,44"];
	end_Tut_01_7K99X2B	[color=black,
		height=0.61111,
		label="",
		pos="275.59,44",
		shape=doublecircle,
		style=filled,
		width=0.61111];
	do_b_Bg3N -> end_Tut_01_7K99X2B	[pos="e,253.51,44 217.77,44 225.91,44 234.91,44 243.31,44"];
}

Parallel

Executes nodes in parallel, finishes when all its nodes are done.

strict digraph "" {
	graph [bb="0,0,206,112",
		label="seq.samples.b",
		lheight=0.19,
		lp="103,11",
		lwidth=1.64,
		rankdir=LR
	];
	node [label="\N"];
	start_Tut_02_xA3NN4J	[color=black,
		height=0.5,
		label="",
		pos="18,67",
		shape=circle,
		style=filled,
		width=0.5];
	one_X6ZBW	[height=0.5,
		label=a,
		pos="99,94",
		width=0.75];
	start_Tut_02_xA3NN4J -> one_X6ZBW	[pos="e,74.761,86.058 35.369,72.594 44,75.544 54.905,79.271 65.188,82.786"];
	two_D1pWx	[height=0.5,
		label=b,
		pos="99,40",
		width=0.75];
	start_Tut_02_xA3NN4J -> two_D1pWx	[pos="e,74.761,47.942 35.369,61.406 44,58.456 54.905,54.729 65.188,51.214"];
	end_Tut_02_xA3NN4J	[color=black,
		height=0.61111,
		label="",
		pos="184,67",
		shape=doublecircle,
		style=filled,
		width=0.61111];
	one_X6ZBW -> end_Tut_02_xA3NN4J	[pos="e,162.76,73.584 123.56,86.335 132.74,83.35 143.37,79.893 153.07,76.735"];
	two_D1pWx -> end_Tut_02_xA3NN4J	[pos="e,162.76,60.416 123.56,47.665 132.74,50.65 143.37,54.107 153.07,57.265"];
}

Loop

A Loop consists of a condition and a block of nodes to execute while said conditions remains True. It also accepts an initialization function to be called, only once, before the first loop iteration.

strict digraph "" {
	graph [bb="0,0,761.97,158",
		label="seq.samples.loop1",
		lheight=0.19,
		lp="380.99,11",
		lwidth=1.64,
		rankdir=LR
	];
	node [label="\N"];
	start_Tut_Loop_gjPPgDY	[color=black,
		height=0.5,
		label="",
		pos="18,78",
		shape=circle,
		style=filled,
		width=0.5];
	"Loop._init_loop_JP2Xl"	[height=0.5,
		label="Loop init",
		pos="115.78,78",
		width=1.1883];
	start_Tut_Loop_gjPPgDY -> "Loop._init_loop_JP2Xl"	[pos="e,72.709,78 36.123,78 43.711,78 53.085,78 62.687,78"];
	my_condition_14Nkq	[height=0.5,
		label=condition,
		pos="257.09,78",
		shape=diamond,
		width=1.7093];
	"Loop._init_loop_JP2Xl" -> my_condition_14Nkq	[pos="e,195.52,78 158.58,78 167.02,78 176.12,78 185.24,78"];
	end_Tut_Loop_gjPPgDY	[color=black,
		height=0.61111,
		label="",
		pos="384.41,136",
		shape=doublecircle,
		style=filled,
		width=0.61111];
	my_condition_14Nkq -> end_Tut_Loop_gjPPgDY	[label=F,
		lp="340.52,124",
		pos="e,364.05,127.04 281.66,88.909 302.41,98.514 332.69,112.53 354.82,122.77"];
	start_block_Tut_Loop_gjPPgDY	[color=black,
		height=0.5,
		label="",
		pos="384.41,78",
		shape=circle,
		style=filled,
		width=0.5];
	my_condition_14Nkq -> start_block_Tut_Loop_gjPPgDY	[pos="e,366.17,78 318.91,78 331.92,78 345.03,78 356,78"];
	one_4Lqjg	[height=0.5,
		label=a,
		pos="470.41,78",
		width=0.75];
	start_block_Tut_Loop_gjPPgDY -> one_4Lqjg	[pos="e,443.09,78 402.82,78 411.54,78 422.44,78 432.89,78"];
	two_7XZo8	[height=0.5,
		label=b,
		pos="561.41,78",
		width=0.75];
	one_4Lqjg -> two_7XZo8	[pos="e,534.38,78 497.63,78 505.96,78 515.32,78 524.23,78"];
	three_21PxW	[height=0.5,
		label=c,
		pos="653.19,78",
		width=0.77169];
	two_7XZo8 -> three_21PxW	[pos="e,625.31,78 588.86,78 597.12,78 606.38,78 615.21,78"];
	end_block_Tut_Loop_gjPPgDY	[color=black,
		height=0.61111,
		label="",
		pos="739.97,44",
		shape=doublecircle,
		style=filled,
		width=0.61111];
	three_21PxW -> end_block_Tut_Loop_gjPPgDY	[pos="e,719.28,51.896 677.36,68.709 687.42,64.675 699.31,59.906 709.95,55.639"];
	end_block_Tut_Loop_gjPPgDY -> my_condition_14Nkq	[pos="e,277.68,65.914 718.18,39.395 701.09,36.011 676.19,32 654.19,32 383.41,32 383.41,32 383.41,32 348.47,32 311.23,47.95 286.51,61.065"];
}
"""
Implements a loop.
The condition checks Loop's index < 3.
"""
import asyncio
import logging
import random
from seq.lib.nodes import Loop

logger = logging.getLogger(__name__)


class Tpl:  # Mandatory class name
    async def a(self):
        """sleeps up to 1 second"""
        t = random.random()  # 0..1
        await asyncio.sleep(t)
        logger.info(".. done A: %d", Loop.index.get())

    async def b(self):
        """sleeps up to 1 second"""
        t = random.random()  # 0..1
        await asyncio.sleep(t)
        logger.info(" .. done B: %d", Loop.index.get())

    async def c(self):
        pass

    async def check_condition(self):
        """
        The magic of contextvars in asyncio
        Loop.index is local to each asyncio task
        """
        logger.info("Loop index: %d", Loop.index.get())
        return Loop.index.get() < 3

    @staticmethod
    def create(**kw):
        t = Tpl()
        l = Loop.create(t.a, t.b, t.c,
                        condition=t.check_condition, **kw)
        return l