'Airflow external_task_sensor never stops poking

I need to wait for another task in another dag until I can trigger my own task. But my external sensor is not stopping poking. I already read some of the other related questions here and made sure I have adjusted the execution_delta. But still, I have the same issue.

Here are my two dags

Parent Dag:

import datetime
import pendulum
from airflow import models
from airflow.operators.python_operator import PythonOperator

local_tz = pendulum.timezone("Europe/Berlin")
args = {
    "start_date": datetime.datetime(2022, 1, 25, tzinfo=local_tz),
    "provide_context": True,
}

def start_job(process_name, **kwargs):
    print('do something: ' + process_name)
    return True


with models.DAG(
        dag_id="test_parent",
        default_args=args,
        # catchup=False,
) as dag:

    task_parent_1 = PythonOperator(
        task_id="task_parent_1",
        python_callable=start_job,
        op_kwargs={"process_name": "my parent task 1"},
        provide_context=True,
    )

    task_parent_2 = PythonOperator(
        task_id="task_parent_2",
        python_callable=start_job,
        op_kwargs={"process_name": "my parent task 2"},
        provide_context=True,
    )

    task_parent_1 >> task_parent_2

And my child dag:

import datetime
import pendulum
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor

local_tz = pendulum.timezone("Europe/Berlin")
args = {
    "start_date": datetime.datetime(2022, 1, 25, tzinfo=local_tz),
    "provide_context": True,
}


def start_job(process_name, **kwargs):
    print('do something: ' + process_name)
    return True


with models.DAG(
        dag_id="test_child",
        default_args=args,
        # catchup=False,
) as dag:
    wait_for_parent_task = ExternalTaskSensor(
        task_id='wait_for_parent_task',
        external_dag_id='test_parent',
        external_task_id='task_parent_2',
        execution_delta=datetime.timedelta(hours=24),
        # execution_date_fn=lambda dt: dt - datetime.timedelta(hours=24),
    )

    task_child_1 = PythonOperator(
        task_id="task_child_1",
        python_callable=start_job,
        op_kwargs={"process_name": "my child task 1"},
        provide_context=True,
    )

    task_child_2 = PythonOperator(
        task_id="task_child_2",
        python_callable=start_job,
        op_kwargs={"process_name": "my child task 2"},
        provide_context=True,
    )

    task_child_1 >> wait_for_parent_task >> task_child_2



Solution 1:[1]

Alternatively, I found also a way that worked for me by setting the execution date to the scheduled date of the parent dag. The advantage: you can trigger the dag also manually

Assuming that the parent dag is scheduled at 6.00 AM in the timezone "Europe/Berlin".

    wait_for_parent_task = ExternalTaskSensor(
        task_id='wait_for_parent_task ',
        external_dag_id='test_parent',
        external_task_id='task_parent_2',
        check_existence=True,
        # execution_date needs to be exact (scheduled time) and the london timezone
        # Remember: The scheduled start is always the one step further in the past - 
        # For a daily schedule: - datetime.timedelta(days=1)
        execution_date_fn=lambda dt: (datetime.datetime(year=dt.year, month=dt.month, day=dt.day, tzinfo=local_tz)
                                      + datetime.timedelta(hours=6, minutes=0)
                                      - datetime.timedelta(days=1)
                                      ).astimezone(local_tz_london),
    )

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Lazloo Xp