'Use list output from a PythonOperator to iterate another operator in Airflow2

Background

So now we want to first get a list from one operator, and then iterate the result and run another operator.

The script is as follows:

def hello_world(ti, execution_date, **context):
    # Do sth here and generate the value final_output
    ti.xcom_push(key='whatever', value=final_output)


dag = DAG(
    "test",
    schedule_interval=None,
    start_date=datetime.datetime(2021, 5, 17),
    catchup=False,
)

with dag:
    t1 = PythonOperator(
        task_id="hello_world",
        python_callable=hello_world,
    )

    outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') }}"

    for x in outcome_list: 
       t2 = PythonOperator(
           task_id="test_{x}",
           python_callable=do_sth,
           op_kwargs={"input_param": x},
       )

     t1 >> t2

The current situation is, we managed to get the xcom variable. The list is always around with 60 elements, which will not cause any performance issue. However, it is returned as a string of list.

To iterate it, we want to transform it to a list and pass to the function which runs the operator in t2

Current issue

The outcome_list is generated via jinja template and saved as a str like this

['user_A US', 'user_B BR' , ..... ] 

We tried to convert the outcome_list into a proper python string with the following function in the DAG:

outcome_list = outcome_list.strip("[]").split(", ")

It returns error as follows

jinja2.exceptions.TemplateSyntaxError: unexpected end of template, expected ','.

And when we tried to convert the output into list with jinja syntax

outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') | list }}"

We got error when performing the loop, said that it is not itertable.

Whats's the issue here and how should we process? Thank you for the help!!



Solution 1:[1]

Placing outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') }}" outside of operator scope is not going to work because this string is not going to be templated.

What you are seeking is creating tasks in a map-reduce kind of way during run time.

For Airflow <2.3.0:

This is not possible with. You can not create task_id(s) based on output of previous task.

For Airflow>=2.3.0:

a new feature added AIP-42 Dynamic Task Mapping This allows to create tasks based on output of previous tasks. Example:

from airflow.decorators import task
@task
def make_list():
    # This can also be from an API call, checking a database, -- almost anything you like, as long as the
    # resulting list/dictionary can be stored in the current XCom backend.
    return [1, 2, {"a": "b"}, "str"]

@task
def consumer(arg):
    print(repr(arg))


with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
    consumer.expand(arg=make_list())

enter image description here

enter image description here

Note: by the time of writing this answer Airflow 2.3.0 isn't released yet. However 2.3.0b1 is released so you can test your code. We expect to release official version in the upcoming weeks.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Elad Kalif