'issue with passing return value from a task as an argument to another task
i've a task that returns a tuple. passing one element of that tuple to another task is not working. i can pass the entire tuple, but not an element from the return value:
from airflow.decorators import dag, task
from pendulum import datetime
@task
def create():
return 1, 2
@task
def consume(one):
print('arg is', one)
@dag(
schedule_interval='@once',
start_date=datetime(2022, 4, 10),
)
def test_dag():
out = create()
consume(out[0]) # does not work: the task gets None as argument
consume(out) # this works
dag = test_dag()
Solution 1:[1]
Within TaskFlow the object returned from a TaskFlow function is actually an XComArg
. These XComArgs
are abstractions over the classic task_instance.xcom_pull(...)
retrieval of XComs
. Additionally XComArg
objects implement __getitem__
for specifying an XCom
key other than "return_value" (which is the default).
So what's going on in the case of using consume(out[0])
is that Airflow is leveraging an XComArg
object to retrieve an XCom
with a key of 0 not retrieving the output from create()
and then the first item. What's going on behind the scenes is task_instance.xcom_pull(task_ids="create", key=0)
.
Yes, this is unexpected in a way and it's not quite inline with the classic xcom_pull()
approach. This issue has been opened to try and achieve feature parity.
In the meantime, you can of course access the whole XComArg
like you show by just using consume(out)
or you can update the TaskFlow function to return a dictionary and use multiple_outputs
to have each key/value pair serialized as their own XComs
.
For example:
from pendulum import datetime
from airflow.decorators import dag, task
@task(multiple_outputs=True)
def create():
return {"one": 1, "two": 2}
@task
def consume(arg):
print('arg is', arg)
@dag(
schedule_interval='@once',
start_date=datetime(2022, 4, 10),
)
def test_dag():
out = create()
consume(out["one"])
dag = test_dag()
Separate XComs created from the create
task:
Side note: multiple_outputs
can also be inferred if the TaskFlow function has a dictionary return type annotation too. This will set multiple_outputs=True
based on the return annotation:
from typing import Dict
@task
def create() -> Dict[str, int]:
return {"one": 1, "two": 2}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |