'Aiflow 2 Xcom in Task Groups
I have two tasks inside a TaskGroup that need to pull xcom values to supply the job_flow_id and step_id. Here's the code:
  with TaskGroup('execute_my_steps') as execute_my_steps: 
    config = {some dictionary}
    dependencies = {another dictionary}
    task_id = 'execute_spark_job_step'
    task_name = 'spark_job'
    add_step = EmrAddStepsOperator(
        task_id=task_id,
        job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
        steps=create_emr_step(args=config, d=dependencies),
        aws_conn_id='aws_default',
        retries=3,
        dag=dag
    )
    wait_for_step = EmrStepSensor(
        task_id='wait_for_' + task_name + '_step',
        job_flow_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='emr', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids='" + task_id + "', key='return_value') }}",
        retries=3,
        dag=dag,
        mode='reschedule'
    )
    add_step >> wait_for_step
The problem is the step_id does not render correctly. The wait_for_step value in the UI rendered template shows as 'None', however, the xcom return_value for execute_spark_job_step is there (this is the emr step_id).
wait_for_step rendered template:

When I remove the TaskGroup, it renders fine and the step waits until the job enters the completed state.
I need this to be in a task group because I will be looping through a larger config file and creating multiple steps.
Why doesn't this work? Do I need a nested TaskGroup? I tried using a TaskGroup without the context manager and still no luck.
Solution 1:[1]
TL;DR:
Your issue is happening because the id is not task_id it's group_id.task_id
so your code should be:
task_ids=f"execute_my_steps.{ task_id }"
=>
    step_id="{{ task_instance.xcom_pull(dag_id='my_dag', task_ids=f"execute_my_steps.{ task_id }", key='return_value') }}",
The explanation why it happens:
When task is assigned to TaskGroup the id of the task is no longer the task_id but it becomes group_id.task_id to reflect this relationship.
In Airflow task_id is unique but when you use TaskGroup you can set the same task_id in different TaskGroups.
If this behavior is not something that you want, you can disable it by setting prefix_group_id=False in your TaskGroup:
with TaskGroup(
    group_id='execute_my_steps',
    prefix_group_id=False
) as execute_my_steps:
By doing so your code will work without changes. The task_id will simply be task_id without the group_id prefix. Note that this also means that it's up to you to make sure you don't have duplicated task_ids in your DAG.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source | 
|---|---|
| Solution 1 | Elad Kalif | 

