A Deeper Look¶
A deeper look to nodes and its attributes is given in the following paragraphs.
Passing Arguments to Actions¶
Action
and ActionInThread
constructors only admits a
function object, no space to pass parameters to the function or
coroutine that is going to be executed by the node.
However, functions are first class citizens in python, this makes easy to create them and pass around dynamically. It is recommended to use partial functions in order to pass parameters to the functions associated to action nodes.
Using partial functions¶
The use of partial functions allows to fix a certain number of arguments of a
function and generate a new function, on the spot.
Please see functools.partial()
for the official documentation. In any
case the following example illustrates its use.
We recommend the use seq.lib.partial()
instead of
functools.partial()
since the former will keep the documentation
of the wrapped function.
from seq.lib import partial
def f(a,b,c):
return a+b+c
g = partial(f,1,2) # creates new function g()
g(3) # Equivalent to f(1,2,3)
g(9)
g(1,2) # f will complain too many arguments were passed
The example partial function example shows a new function g() created by using partial in order to specify that g() when, called will invoke f(1,2) plus any extra argument given.
From the point of view of the python interpreter g() is a normal function, meaning you can use g() as many times as you see fit.
Runtime Flags¶
It is possible to associate runtime flags to the nodes. In order to skip or pause them. The easiest way to do this is by using the _pause and _skip shortcuts, as follows:
from seq.lib import _pause, _skip, Sequence
async def a():
pass
def b():
pass
node1 = _pause(a);
node2 = _skip(b);
s = Sequence.create(node1, node2, _skip(a))
Another way is to use set_runtime_flags(node, flags) function which requires a node and sets its runtime flags from the parameter. Valid runtime flags values are PAUSE and SKIP:
from seq.lib import RTFLAG, set_runtime_flag
node = Action(f)
set_runtime_flag(node, RTFLAG.PAUSE)
...
set_runtime_flag(node, RTFLAG.SKIP, False)
Summary building DAGs¶
Some more points to note from the basic examples shown before.
Constructor Calling conventions¶
First of all, except for Action
and ActionInThread
do not use the standard class_name() constructor to build
nodes. i.e. never use naked Parallel() to create a Parallel
node, same for the other node types.
Instead, use the create() method every container node implements (Loop, Sequence, Parallel).
Important
Use the create() method to instance all container classes (Loop, Sequence, Parallel, etc).
Since they hold a variable number of children nodes, their constructors (create() method) receives its children as positional arguments. Any other attribute (mandatory or not) is passed trough keyword arguments. See Node constructor calling convention
# Sequence ctor
Sequence.create(child_1, child_2, ..., child_n,
id="my_unique_id", name="my nice name")
# Parallel ctor
Parallel.create(child_1, child_2, ..., child_n,
id="my_unique_id", name="my nice name")
# Loop ctor
Loop.create(child_1, child_2, ..., child_n,
id="my_unique_id", name="my nice name",
init = init_function,
condition= condition_function
)
Node Attributes¶
Node attributes are passed as keyword arguments in ts create method. Every node has, at least, a name and id attributes. If they are not specified while building the DAG they are assigned by the engine pseudo–randomly.
The specific attributes for each node type are detailed in the following table:
Node Class |
Attribute |
Description |
---|---|---|
ALL |
id |
Node id (must be unique) |
ALL |
name |
Node name |
|
init |
Initialization node |
|
condition |
Loop’s condition node |
Special Variables¶
Sequences uses contextvars.ContextVar
to distribute special values
around all tasks and threads that are part of a given Sequencer script.
The ContextVar module implements context variables. This concept is similar to thread-local storage (TLS), but, unlike TLS, it also allows correctly keeping track of values per asynchronous task, e.g. asyncio.Task.
Class |
Variable |
Description |
---|---|---|
Sequence |
current_seq |
Current Sequence name |
Sequence |
root |
The root node |
Loop |
index |
Loop’s running index |
Result Handling¶
Each node has a result attribute where the return value from its associated step is stored.
For Action
and ActionInThread
, the result is just the return value from its
associated function. Since each Action is just a simple function or method, their result
is kept in its corresponding node’s attribute result.
All other nodes will have an empty result unless it is explicitely set by some step, in the sequence. Example Set node result shows a step setting the result of the sequence that contains it. The node that contains a given action is accessed trough the Sequence.current_seq context variable as the example shows.
It is clear that in order to check for a node’s result one needs to have a handler to that node or a way to find it, see Finding nodes.
Note
Both Parallel
and Loop
classes inherit from Sequence
. Therefore they can access Sequence.current_seq context variable.
from seqlib.ob import OB
# reuse some sequence ...
from . import test_a as a
async def mystep():
"""Sets current Sequence's result"""
s = Sequence.current_seq.get()
s.result = 0
async def test_result():
tpa = OB.create_sequence(a)
sc = Parallel.create(tpa, mystep)
await sc.start()
assert sc.result == 0
Finding nodes¶
Unless one has saved a reference to a node, e.g. as a class member or global variable. The only way to find a node in the DAG is trough its unique id.
The func:seq.lib.nodes.find_node receives as a parameter a starting node and the id of the node being looked up. On success it returns a tuple the target node and its parent as the tuple (parent, node).
In order to lookup a node from the DAG’s root (meaning look around the complete sequence until a hit is found), simply pass the root context variable as the initial node.
In order to lookup a node from the current sequence use the current_seq context variable as the starting node. Lookup a node illustrates both use cases.
from seqlib.nodes import Sequence, find_node
...
async def do_a():
# find node `id1` (not shown) starting at root and get its result.
_, node = find_node(Sequence.root.get(), "id1")
print("the other node result", node.result)
return node.result + 1; # or something
async def do_b():
# find do_a's result
_, node_a = find_node(Sequence.current_seq.get(), "id_do_a")
return node_a.result +1; # or do something else
# Given a Sequence s
s = Sequence.create(Action(do_a, id="id_do_a"), do_b)
Since node_ids are unique inside a given Sequence there is no risk of loosing an Action’s result because it gets overwritten by some other node. As opposed to Nodes have context.
Nodes have context¶
Besides the result attribute that can be inspected in order to pass information between Sequencer scripts. There is the context dictionary which can be freely accesses throughout all nodes being executed.
The context dictionary is a property shared among all Sequence
nodes (includes Loop and Parallel). Action
and ActionInThread
nodes can gain access to it trough Sequence.get_context()
static method. Plese see the following code excerpts Node context example.
The methods do_a() and do_b() must access the context trough the Sequence.get_context() static method. The object tpl, being a Sequence
instance can access its Sequence.context
attribute.
async def do_a():
ctx = Sequence.get_context()
ctx["do_a"] = 1
def do_b():
ctx = Sequence.get_context()
ctx["do_b"] = 1
# creates the sequence
tpl = Sequence.create(do_a, ActionInThread(do_b))
# Sequence.root.set(tpl)
await tpl.start()
ctx = tpl.context
assert tpl.context["do_a"] == 1
assert tpl.context["do_b"] == 1
Warning
Notice there are no hard rules about what can go into the context dictionary and under what key. It might be simpler to use than setting results on nodes but there is no guarantee a given key might be overwritten in some other part of the running script just because of a name clash.
Node Types¶
The sequencer node types lives in the module seq.nodes
:
Action¶
The simple action node. It contains a python function or method to be executed.
# Creates a node with some properties.
node_a = Node(t.a, name="node")
Sequence¶
Executes nodes one after the other.
Parallel¶
Executes nodes in parallel, finishes when all its nodes are done.
Loop¶
A Loop consists of a condition and a block of nodes to execute while said conditions remains True. It also accepts an initialization function to be called, only once, before the first loop iteration.
"""
Implements a loop.
The condition checks Loop's index < 3.
"""
import asyncio
import logging
import random
from seq.lib.nodes import Loop
logger = logging.getLogger(__name__)
class Tpl: # Mandatory class name
async def a(self):
"""sleeps up to 1 second"""
t = random.random() # 0..1
await asyncio.sleep(t)
logger.info(".. done A: %d", Loop.index.get())
async def b(self):
"""sleeps up to 1 second"""
t = random.random() # 0..1
await asyncio.sleep(t)
logger.info(" .. done B: %d", Loop.index.get())
async def c(self):
pass
async def check_condition(self):
"""
The magic of contextvars in asyncio
Loop.index is local to each asyncio task
"""
logger.info("Loop index: %d", Loop.index.get())
return Loop.index.get() < 3
@staticmethod
def create(**kw):
t = Tpl()
l = Loop.create(t.a, t.b, t.c,
condition=t.check_condition, **kw)
return l