Advanced Usage
Using Multiple Task Decorators
You can use multiple task decorators on a single function to create multiple tasks with different dependencies and settings:
from taskade import task
@task(graph_name="graph1", dependencies=("task_b",), name="task_a_1", output_names=("result_a",))
@task(graph_name="graph1", dependencies=("task_c",), name="task_a_2", output_names=("result_b",))
@task(graph_name="graph2", dependencies=("task_d",), name="task_a_3", output_names=("result_c",))
def task_a(arg):
return arg
@task(graph_name="graph1")
def task_b():
return "Task B"
@task(graph_name="graph1")
def task_c():
return "Task C"
@task(graph_name="graph2")
def task_d():
return "Task D"
@task(graph_name="graph1", dependencies=("task_a_1", "task_a_2"))
def task_e(result_a, result_b):
return result_a, result_b
In this example, task_a
takes an argument arg and returns it. The output names are specified as result_a
, result_b
, and result_c
for each of the tasks created by the multiple decorators.
task_e
depends on task_a_1
and task_a_2
, and takes two arguments result_a
and result_b
, which are the outputs of task_a_1
and task_a_2
respectively.
Note that the number of arguments in task_e matches the number of outputs from the dependencies task_a_1
and task_a_2
.
Init Kwargs
You can pass initialization keyword arguments to tasks using the init_kwargs
parameter:
from taskade import task
@task(graph_name="graph", init_kwargs={"foo": "bar"})
def task_a(foo):
return foo
graph = task_a.graph
results = graph()
print(results[task_a]) # Output: "bar"
This passes the foo keyword argument to task_a
when it's executed.
Using Output Names
You can specify output names for tasks using the output_names
parameter:
from taskade import task
@task(graph_name="graph", output_names=["result_a", "result_b"])
def task_a():
return "Result A", "Result B"
@task(graph_name="graph", dependencies=["task_a"], init_kwargs={"result_a": None, "result_b": None})
def task_b(result_a, result_b):
return result_a, result_b
graph = task_a.graph
results = graph()
print(results[task_b]) # Output: ("Result A", "Result B")
This sets the output names of task_a
to result_a
and result_b
, and passes these outputs as keyword arguments to task_b
.
Creation from Dictionary
You can create a graph from a dictionary of tasks:
from taskade import Graph, task
tasks = {
"task_a": task_a,
"task_b": task_b,
"task_c": task_c,
}
graph = Graph.from_dict(tasks, name="my_graph")
results = graph()
print(results)
This creates a graph with the tasks in the dictionary and executes it.
Controlling Concurrency
Async Execution
You can control the concurrency of async execution by passing an async semaphore to the graph's __call__
method:
from taskade import task, get_graph
@task(graph_name="my_graph", output_names=["result_a"])
async def task_a():
return "Result A"
@task(graph_name="my_graph", dependencies=(task_a, ), input_names=["result_a"])
async def task_b(result_a):
return f"Task B received {result_a}"
async def main():
graph = get_graph("my_graph")
async with asyncio.Semaphore(5) as semaphore:
results = await graph(tasks_semaphore=semaphore)
print(results)
Sync Execution
You can control the concurrency of sync execution by passing a pool to the graph's __call__
method:
from taskade import task, get_graph
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
@task(graph_name="my_graph", output_names=("result_a",))
def task_a():
return "Result A"
@task(graph_name="my_graph", dependencies=(task_a,))
def task_b(result_a):
return f"Task B received {result_a}"
graph = get_graph("my_graph")
with ThreadPoolExecutor(max_workers=5) as pool:
results = graph(concurrency_pool=pool)
print(results)
# or
with ProcessPoolExecutor(max_workers=5) as pool:
results = graph(concurrency_pool=pool)
print(results)
Or you can let taskade manage the pool for you by just passing in the n_jobs
parameter. When executing a sync graph concurrently this way you can also pass in the type of pool into the concurency_pool
parameter. By default it is set to ThreadPoolExecutor.
from taskade import task, get_graph
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
@task(graph_name="my_graph", output_names=("result_a",))
def task_a():
return "Result A"
@task(graph_name="my_graph", dependencies=(task_a,))
def task_b(result_a):
return f"Task B received {result_a}"
graph = get_graph("my_graph")
results = graph(n_jobs=5)
print(results)
# or
results = graph(n_jobs=5, concurrency_pool=ProcessPoolExecutor)
print(results)
In both examples, the graph_name
parameter is used to set the name of the graph, and the get_graph function is used to retrieve the graph. The graph is then called with the concurrency control parameters.