A Deeper Look

A deeper look to nodes and its attributes is given in the following paragraphs.

Passing Arguments to Actions

Action and ActionInThread constructors only admits a function object, no space to pass parameters to the function or coroutine that is going to be executed by the node.

However, functions are first class citizens in python, this makes easy to create them and pass around dynamically. Python provides, at least, two strategies to pass parameters to the functions associated to the action nodes.

  1. partial functions

  2. lambda functions

The following example illustrates the use of partial and lambda functions to pass parameters to action nodes. See Using partial functions and Using lambda functions for a brief explanation on its usage and refer to the official Python documentation about them.

""" Tutorial - 04

Shows how to pass arguments to functions in Actions
"""
from functools import partial
import asyncio
from seqlib.nodes import Action, ActionInThread, Parallel


class Tpl:
    def do_a(self, msg):
        """I don't need to be a coroutine"""
        print(msg)

    async def do_b(self, value):
        await asyncio.sleep(value)

    @staticmethod
    def create(*args, **kw):
        t = Tpl()

        # create new functions that calls `t` methods
        # with given parameters
        f = partial(t.do_a, "my message")
        g = lambda v: t.do_b(99)

        return Parallel.create(
            ActionInThread(f, name="one"),
            Action(g, name="two"),
            name="Tut_02",
        )

Using partial functions

In simple, partial functions allow us to fix a certain number of arguments of a function and generate a new function, on the spot. Please see functools.partial() for the official documentation. In any case the following example illustrates its use.

partial function example
from functools import partial

def f(a,b,c):
  return a+b+c

g = f(1,2)  # creates new function g()
g(3) # Equivalent to f(1,2,3)

g(1,2) # f will complain too many arguments were passed

The example partial function example shows a new partial function g() is created using partial in order to specify that g() when, called will invoke f(1,2) plus any extra argument given.

Using lambda functions

Another tool to create functions on the spot are lambda functions. A lambda function , or a Small Anonymous Function, is a self-contained block of functionality that can be passed around and used in your code.

A lambda function can take any number of arguments, but can only have one expression.

lambda function example
x = lambda a : a + 10
print(x(5))

Summary building DAGs

Some more points to note from the basic examples shown before.

Constructor Calling conventions

First of all, except for Action and ActionInThread do not use the standard class_name() constructor to build nodes. i.e. never use naked Parallel() to create a Parallel node, same for the other node types.

Instead, use the create() method every container node implements (Loop, Sequence, Parallel).

Since they hold a variable number of children nodes, their constructors (create() method) receives its children as positional arguments. Any other attribute (mandatory or not) is passed trough keyword arguments. See Node constructor calling convention

Node constructor calling convention
# Sequence ctor
Sequence.create(child_1, child_2, ..., child_n,
                id="my_unique_id", name="my nice name")

# Parallel ctor
Parallel.create(child_1, child_2, ..., child_n,
                id="my_unique_id", name="my nice name")

# Loop ctor
Loop.create(child_1, child_2, ..., child_n,
                id="my_unique_id", name="my nice name",
                init = init_function,
                condition= condition_function
                )

Node Attributes

Node attributes are passed as keyword arguments iin ts create method. Every node has, at least, a name and id attributes. If they are not specified while building the DAG they are assigned by the engine pseudo–randomly.

The specific attributes for each node type are detailed in the following table:

Node Class

Attribute

Description

ALL

id

Node id (must be unique)

ALL

name

Node name

Loop

init

Initialization node

Loop

condition

Loop’s condition node

Special Variables

Sequences uses contextvars.ContextVar to distribute special values around all tasks and threads that are part of a given Sequencer script.

The ContextVar module implements context variables. This concept is similar to thread-local storage (TLS), but, unlike TLS, it also allows correctly keeping track of values per asynchronous task, e.g. asyncio.Task.

Class

Variable

Description

Sequence

current_tpl

Current Sequence name

Sequence

root

The root node

Loop

index

Loop’s running index

Result Handling

Each node has a result attribute where the return value from its associated step is stored.

For Action and ActionInThread, the result is just the return value from its associated function. Since each Action is just a simple function or method, their result is kept in its corresponding node’s attribute result.

All other nodes will have an empty result unless it is explicitely set by some step, in the sequence. Example Set node result shows a step setting the result of the sequence that contains it. The node that contains a given action is accessed trough the Sequence.current_tpl context variable as the example shows.

It is clear that in order to check for a node’s result one needs to have a handler to that node or a way to find it, see Finding nodes.

Note

Both Parallel and Loop classes inherit from Sequence. Therefore they can access Sequence.current_tpl context variable.

Set node result
  from seqlib.ob import OB
  # reuse some sequence ...
  from . import test_a as a

  async def mystep():
    """Sets current Sequence's result"""
    s = Sequence.current_tpl.get()
    s.result = 0

  async def test_result():
      tpa = OB.create_sequence(a)
      sc = Parallel.create(tpa, mystep)
      await sc.start()
      assert sc.result == 0

Finding nodes

Unless one has saved a reference to a node, e.g. as a class member or global variable. The only way to find a node in the DAG is trough its unique id.

The func:seqlib.nodes.find_node receives as a parameter a starting node and the id of the node being looked up. On success it returns a tuple the target node and its parent as the tuple (parent, node).

  1. In order to lookup a node from the DAG’s root (meaning look around the complete sequence until a hit is found), simply pass the root context variable as the initial node.

  2. In order to lookup a node from the current sequence use the current_tpl context variable as the starting node. Lookup a node illustrates both use cases.

Lookup a node
from seqlib.nodes import Sequence, find_node
...
async def do_a():
  # find node `id1` (not shown) starting at root and get its result.
  _, node = find_node(Sequence.root.get(), "id1")
  print("the other node result", node.result)
  return node.result + 1; # or something

async def do_b():
  # find do_a's result
  _, node_a = find_node(Sequence.current_tpl.get(), "id_do_a")
  return node_a.result +1; # or do something else

# Given a Sequence s
s = Sequence.create(Action(do_a, id="id_do_a"), do_b)

Since node_ids are unique inside a given Sequence there is no risk of loosing an Action’s result because it gets overwritten by some other node. As opposed to Nodes have context.

Nodes have context

Besides the result attribute that can be inspected in order to pass information between Sequencer scripts. There is the context dictionary which can be freely accesses throughout all nodes being executed.

The context dictionary is a property shared among all Sequence nodes (includes Loop and Parallel). Action and ActionInThread nodes can gain access to it trough Sequence.get_context() static method. Plese see the following code excerpts Node context example.

The methods do_a() and do_b() must access the context trough the Sequence.get_context() static method. The object tpl, being a Sequence instance can access its Sequence.context attribute.

Node context example
  async def do_a():
      ctx = Sequence.get_context()
      ctx["do_a"] = 1

  def do_b():
      ctx = Sequence.get_context()
      ctx["do_b"] = 1

  # creates the sequence
  tpl = Sequence.create(do_a, ActionInThread(do_b))
  # Sequence.root.set(tpl)
  await tpl.start()
  ctx = tpl.context

  assert tpl.context["do_a"] == 1
  assert tpl.context["do_b"] == 1

Warning

Notice there are no hard rules about what can go into the context dictionary and under what key. It might be simpler to use than setting results on nodes but there is no guarantee a given key might be overwritten in some other part of the running script just because of a name clash.

Node Types

The sequencer node types lives in the module seq.nodes:

Action

The simple action node. It contains a python function or method to be executed.

# Creates a node with some properties.
node_a = Node(t.a, name="node")

Sequence

Executes nodes one after the other.

strict digraph "" {
	graph [bb="0,0,297.59,66",
		label="seq.samples.tut.tut01",
		lheight=0.19,
		lp="148.79,11",
		lwidth=1.64,
		rankdir=LR
	];
	node [label="\N"];
	start_Tut_01_7K99X2B	[color=black,
		height=0.5,
		label="",
		pos="18,44",
		shape=circle,
		style=filled,
		width=0.5];
	A_oAJLK	[height=0.5,
		label=A,
		pos="99,44",
		width=0.75];
	start_Tut_01_7K99X2B -> A_oAJLK	[pos="e,71.874,44 36.142,44 43.644,44 52.75,44 61.642,44"];
	do_b_Bg3N	[height=0.5,
		label=do_b,
		pos="189.79,44",
		width=0.77205];
	A_oAJLK -> do_b_Bg3N	[pos="e,161.9,44 126.16,44 134.16,44 143.12,44 151.7,44"];
	end_Tut_01_7K99X2B	[color=black,
		height=0.61111,
		label="",
		pos="275.59,44",
		shape=doublecircle,
		style=filled,
		width=0.61111];
	do_b_Bg3N -> end_Tut_01_7K99X2B	[pos="e,253.51,44 217.77,44 225.91,44 234.91,44 243.31,44"];
}

Parallel

Executes nodes in parallel, finishes when all its nodes are done.

strict digraph "" {
	graph [bb="0,0,206,112",
		label="seq.samples.tut.tut02",
		lheight=0.19,
		lp="103,11",
		lwidth=1.64,
		rankdir=LR
	];
	node [label="\N"];
	start_Tut_02_xA3NN4J	[color=black,
		height=0.5,
		label="",
		pos="18,67",
		shape=circle,
		style=filled,
		width=0.5];
	one_X6ZBW	[height=0.5,
		label=one,
		pos="99,94",
		width=0.75];
	start_Tut_02_xA3NN4J -> one_X6ZBW	[pos="e,74.761,86.058 35.369,72.594 44,75.544 54.905,79.271 65.188,82.786"];
	two_D1pWx	[height=0.5,
		label=two,
		pos="99,40",
		width=0.75];
	start_Tut_02_xA3NN4J -> two_D1pWx	[pos="e,74.761,47.942 35.369,61.406 44,58.456 54.905,54.729 65.188,51.214"];
	end_Tut_02_xA3NN4J	[color=black,
		height=0.61111,
		label="",
		pos="184,67",
		shape=doublecircle,
		style=filled,
		width=0.61111];
	one_X6ZBW -> end_Tut_02_xA3NN4J	[pos="e,162.76,73.584 123.56,86.335 132.74,83.35 143.37,79.893 153.07,76.735"];
	two_D1pWx -> end_Tut_02_xA3NN4J	[pos="e,162.76,60.416 123.56,47.665 132.74,50.65 143.37,54.107 153.07,57.265"];
}

Loop

A Loop consists of a condition and a block of nodes to execute while said conditions remains True. It also accepts an initialization function to be called, only once, before the first loop iteration.

strict digraph "" {
	graph [bb="0,0,761.97,158",
		label="seq.samples.tut.tut03",
		lheight=0.19,
		lp="380.99,11",
		lwidth=1.64,
		rankdir=LR
	];
	node [label="\N"];
	start_Tut_Loop_gjPPgDY	[color=black,
		height=0.5,
		label="",
		pos="18,78",
		shape=circle,
		style=filled,
		width=0.5];
	"Loop._init_loop_JP2Xl"	[height=0.5,
		label="Loop init",
		pos="115.78,78",
		width=1.1883];
	start_Tut_Loop_gjPPgDY -> "Loop._init_loop_JP2Xl"	[pos="e,72.709,78 36.123,78 43.711,78 53.085,78 62.687,78"];
	my_condition_14Nkq	[height=0.5,
		label=condition,
		pos="257.09,78",
		shape=diamond,
		width=1.7093];
	"Loop._init_loop_JP2Xl" -> my_condition_14Nkq	[pos="e,195.52,78 158.58,78 167.02,78 176.12,78 185.24,78"];
	end_Tut_Loop_gjPPgDY	[color=black,
		height=0.61111,
		label="",
		pos="384.41,136",
		shape=doublecircle,
		style=filled,
		width=0.61111];
	my_condition_14Nkq -> end_Tut_Loop_gjPPgDY	[label=F,
		lp="340.52,124",
		pos="e,364.05,127.04 281.66,88.909 302.41,98.514 332.69,112.53 354.82,122.77"];
	start_block_Tut_Loop_gjPPgDY	[color=black,
		height=0.5,
		label="",
		pos="384.41,78",
		shape=circle,
		style=filled,
		width=0.5];
	my_condition_14Nkq -> start_block_Tut_Loop_gjPPgDY	[pos="e,366.17,78 318.91,78 331.92,78 345.03,78 356,78"];
	one_4Lqjg	[height=0.5,
		label=one,
		pos="470.41,78",
		width=0.75];
	start_block_Tut_Loop_gjPPgDY -> one_4Lqjg	[pos="e,443.09,78 402.82,78 411.54,78 422.44,78 432.89,78"];
	two_7XZo8	[height=0.5,
		label=two,
		pos="561.41,78",
		width=0.75];
	one_4Lqjg -> two_7XZo8	[pos="e,534.38,78 497.63,78 505.96,78 515.32,78 524.23,78"];
	three_21PxW	[height=0.5,
		label=three,
		pos="653.19,78",
		width=0.77169];
	two_7XZo8 -> three_21PxW	[pos="e,625.31,78 588.86,78 597.12,78 606.38,78 615.21,78"];
	end_block_Tut_Loop_gjPPgDY	[color=black,
		height=0.61111,
		label="",
		pos="739.97,44",
		shape=doublecircle,
		style=filled,
		width=0.61111];
	three_21PxW -> end_block_Tut_Loop_gjPPgDY	[pos="e,719.28,51.896 677.36,68.709 687.42,64.675 699.31,59.906 709.95,55.639"];
	end_block_Tut_Loop_gjPPgDY -> my_condition_14Nkq	[pos="e,277.68,65.914 718.18,39.395 701.09,36.011 676.19,32 654.19,32 383.41,32 383.41,32 383.41,32 348.47,32 311.23,47.95 286.51,61.065"];
}
"""
Implements a simple loop.
The condition checks Loop's index < 3.
"""
import asyncio
import logging
import random
from seqlib.nodes import Action, Sequence, Parallel, Loop

logger = logging.getLogger(__name__)

class Tpl(Sequence):
    async def a(self):
        """sleeps up to 1 second"""
        t = random.random()  # 0..1
        # await asyncio.sleep(t)
        print(" .. done A", Loop.index.get())
        return "A%d" % (Loop.index.get())

    async def check_condition(self):
        """
        The magic of contextvars in asyncio
        Loop.index is local to each asyncio task
        """
        return Loop.index.get() < 3

    @staticmethod
    def create(**kw):
        tpl = Tpl()
        a = Action(tpl.a, id="node_a")
        l1 = Loop.create(a, condition=tpl.check_condition, **kw)
        return l1