A Deeper Look¶
A deeper look to nodes and its attributes is given in the following paragraphs.
Passing Arguments to Actions¶
Action
and ActionInThread
constructors only admits a function object, no space to pass parameters to the function or coroutine that is going to be executed by the node.
However, functions are first class citizens in python, this makes easy to create them and pass around dynamically. Python provides, at least, two strategies to pass parameters to the functions associated to the action nodes.
partial functions
lambda functions
The following example illustrates the use of partial and lambda functions to pass parameters to action nodes. See Using partial functions and Using lambda functions for a brief explanation on its usage and refer to the official Python documentation about them.
""" Tutorial - 04
Shows how to pass arguments to functions in Actions
"""
from functools import partial
import asyncio
from seqlib.nodes import Action, ActionInThread, Parallel
class Tpl:
def do_a(self, msg):
"""I don't need to be a coroutine"""
print(msg)
async def do_b(self, value):
await asyncio.sleep(value)
@staticmethod
def create(*args, **kw):
t = Tpl()
# create new functions that calls `t` methods
# with given parameters
f = partial(t.do_a, "my message")
g = lambda v: t.do_b(99)
return Parallel.create(
ActionInThread(f, name="one"),
Action(g, name="two"),
name="Tut_02",
)
Using partial functions¶
In simple, partial functions allow us to fix a certain number of arguments of a function and generate a new function, on the spot. Please see functools.partial()
for the official documentation. In any case the following example illustrates its use.
from functools import partial
def f(a,b,c):
return a+b+c
g = f(1,2) # creates new function g()
g(3) # Equivalent to f(1,2,3)
g(1,2) # f will complain too many arguments were passed
The example partial function example shows a new partial function g() is created using partial in order to specify that g() when, called will invoke f(1,2) plus any extra argument given.
Using lambda functions¶
Another tool to create functions on the spot are lambda functions. A lambda function , or a Small Anonymous Function, is a self-contained block of functionality that can be passed around and used in your code.
A lambda function can take any number of arguments, but can only have one expression.
x = lambda a : a + 10
print(x(5))
Summary building DAGs¶
Some more points to note from the basic examples shown before.
Constructor Calling conventions¶
First of all, except for Action
and ActionInThread
do not use the standard class_name() constructor to build nodes. i.e. never use naked Parallel() to create a Parallel
node, same for the other node types.
Instead, use the create() method every container node implements (Loop, Sequence, Parallel).
Since they hold a variable number of children nodes, their constructors (create() method) receives its children as positional arguments. Any other attribute (mandatory or not) is passed trough keyword arguments. See Node constructor calling convention
# Sequence ctor
Sequence.create(child_1, child_2, ..., child_n,
id="my_unique_id", name="my nice name")
# Parallel ctor
Parallel.create(child_1, child_2, ..., child_n,
id="my_unique_id", name="my nice name")
# Loop ctor
Loop.create(child_1, child_2, ..., child_n,
id="my_unique_id", name="my nice name",
init = init_function,
condition= condition_function
)
Node Attributes¶
Node attributes are passed as keyword arguments iin ts create method. Every node has, at least, a name and id attributes. If they are not specified while building the DAG they are assigned by the engine pseudo–randomly.
The specific attributes for each node type are detailed in the following table:
Node Class |
Attribute |
Description |
---|---|---|
ALL |
id |
Node id (must be unique) |
ALL |
name |
Node name |
|
init |
Initialization node |
|
condition |
Loop’s condition node |
Special Variables¶
Sequences uses contextvars.ContextVar
to distribute special values
around all tasks and threads that are part of a given Sequencer script.
The ContextVar module implements context variables. This concept is similar to thread-local storage (TLS), but, unlike TLS, it also allows correctly keeping track of values per asynchronous task, e.g. asyncio.Task.
Class |
Variable |
Description |
---|---|---|
Sequence |
current_tpl |
Current Sequence name |
Sequence |
root |
The root node |
Loop |
index |
Loop’s running index |
Result Handling¶
Each node has a result attribute where the return value from its associated step is stored.
For Action
and ActionInThread
, the result is just the return value from its
associated function. Since each Action is just a simple function or method, their result
is kept in its corresponding node’s attribute result.
All other nodes will have an empty result unless it is explicitely set by some step, in the sequence. Example Set node result shows a step setting the result of the sequence that contains it. The node that contains a given action is accessed trough the Sequence.current_tpl context variable as the example shows.
It is clear that in order to check for a node’s result one needs to have a handler to that node or a way to find it, see Finding nodes.
Note
Both Parallel
and Loop
classes inherit from Sequence
. Therefore they can access Sequence.current_tpl context variable.
from seqlib.ob import OB
# reuse some sequence ...
from . import test_a as a
async def mystep():
"""Sets current Sequence's result"""
s = Sequence.current_tpl.get()
s.result = 0
async def test_result():
tpa = OB.create_sequence(a)
sc = Parallel.create(tpa, mystep)
await sc.start()
assert sc.result == 0
Finding nodes¶
Unless one has saved a reference to a node, e.g. as a class member or global variable. The only way to find a node in the DAG is trough its unique id.
The func:seqlib.nodes.find_node receives as a parameter a starting node and the id of the node being looked up. On success it returns a tuple the target node and its parent as the tuple (parent, node).
In order to lookup a node from the DAG’s root (meaning look around the complete sequence until a hit is found), simply pass the root context variable as the initial node.
In order to lookup a node from the current sequence use the current_tpl context variable as the starting node. Lookup a node illustrates both use cases.
from seqlib.nodes import Sequence, find_node
...
async def do_a():
# find node `id1` (not shown) starting at root and get its result.
_, node = find_node(Sequence.root.get(), "id1")
print("the other node result", node.result)
return node.result + 1; # or something
async def do_b():
# find do_a's result
_, node_a = find_node(Sequence.current_tpl.get(), "id_do_a")
return node_a.result +1; # or do something else
# Given a Sequence s
s = Sequence.create(Action(do_a, id="id_do_a"), do_b)
Since node_ids are unique inside a given Sequence there is no risk of loosing an Action’s result because it gets overwritten by some other node. As opposed to Nodes have context.
Nodes have context¶
Besides the result attribute that can be inspected in order to pass information between Sequencer scripts. There is the context dictionary which can be freely accesses throughout all nodes being executed.
The context dictionary is a property shared among all Sequence
nodes (includes Loop and Parallel). Action
and ActionInThread
nodes can gain access to it trough Sequence.get_context()
static method. Plese see the following code excerpts Node context example.
The methods do_a() and do_b() must access the context trough the Sequence.get_context() static method. The object tpl, being a Sequence
instance can access its Sequence.context
attribute.
async def do_a():
ctx = Sequence.get_context()
ctx["do_a"] = 1
def do_b():
ctx = Sequence.get_context()
ctx["do_b"] = 1
# creates the sequence
tpl = Sequence.create(do_a, ActionInThread(do_b))
# Sequence.root.set(tpl)
await tpl.start()
ctx = tpl.context
assert tpl.context["do_a"] == 1
assert tpl.context["do_b"] == 1
Warning
Notice there are no hard rules about what can go into the context dictionary and under what key. It might be simpler to use than setting results on nodes but there is no guarantee a given key might be overwritten in some other part of the running script just because of a name clash.
Node Types¶
The sequencer node types lives in the module seq.nodes
:
Action¶
The simple action node. It contains a python function or method to be executed.
# Creates a node with some properties.
node_a = Node(t.a, name="node")
Sequence¶
Executes nodes one after the other.
Parallel¶
Executes nodes in parallel, finishes when all its nodes are done.
Loop¶
A Loop consists of a condition and a block of nodes to execute while said conditions remains True. It also accepts an initialization function to be called, only once, before the first loop iteration.
"""
Implements a simple loop.
The condition checks Loop's index < 3.
"""
import asyncio
import logging
import random
from seqlib.nodes import Action, Sequence, Parallel, Loop
logger = logging.getLogger(__name__)
class Tpl(Sequence):
async def a(self):
"""sleeps up to 1 second"""
t = random.random() # 0..1
# await asyncio.sleep(t)
print(" .. done A", Loop.index.get())
return "A%d" % (Loop.index.get())
async def check_condition(self):
"""
The magic of contextvars in asyncio
Loop.index is local to each asyncio task
"""
return Loop.index.get() < 3
@staticmethod
def create(**kw):
tpl = Tpl()
a = Action(tpl.a, id="node_a")
l1 = Loop.create(a, condition=tpl.check_condition, **kw)
return l1