'Airflow external_task_sensor never stops poking
I need to wait for another task in another dag until I can trigger my own task. But my external sensor is not stopping poking. I already read some of the other related questions here and made sure I have adjusted the execution_delta. But still, I have the same issue.
Here are my two dags
Parent Dag:
import datetime
import pendulum
from airflow import models
from airflow.operators.python_operator import PythonOperator
local_tz = pendulum.timezone("Europe/Berlin")
args = {
"start_date": datetime.datetime(2022, 1, 25, tzinfo=local_tz),
"provide_context": True,
}
def start_job(process_name, **kwargs):
print('do something: ' + process_name)
return True
with models.DAG(
dag_id="test_parent",
default_args=args,
# catchup=False,
) as dag:
task_parent_1 = PythonOperator(
task_id="task_parent_1",
python_callable=start_job,
op_kwargs={"process_name": "my parent task 1"},
provide_context=True,
)
task_parent_2 = PythonOperator(
task_id="task_parent_2",
python_callable=start_job,
op_kwargs={"process_name": "my parent task 2"},
provide_context=True,
)
task_parent_1 >> task_parent_2
And my child dag:
import datetime
import pendulum
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor
local_tz = pendulum.timezone("Europe/Berlin")
args = {
"start_date": datetime.datetime(2022, 1, 25, tzinfo=local_tz),
"provide_context": True,
}
def start_job(process_name, **kwargs):
print('do something: ' + process_name)
return True
with models.DAG(
dag_id="test_child",
default_args=args,
# catchup=False,
) as dag:
wait_for_parent_task = ExternalTaskSensor(
task_id='wait_for_parent_task',
external_dag_id='test_parent',
external_task_id='task_parent_2',
execution_delta=datetime.timedelta(hours=24),
# execution_date_fn=lambda dt: dt - datetime.timedelta(hours=24),
)
task_child_1 = PythonOperator(
task_id="task_child_1",
python_callable=start_job,
op_kwargs={"process_name": "my child task 1"},
provide_context=True,
)
task_child_2 = PythonOperator(
task_id="task_child_2",
python_callable=start_job,
op_kwargs={"process_name": "my child task 2"},
provide_context=True,
)
task_child_1 >> wait_for_parent_task >> task_child_2
Solution 1:[1]
Alternatively, I found also a way that worked for me by setting the execution date to the scheduled date of the parent dag. The advantage: you can trigger the dag also manually
Assuming that the parent dag is scheduled at 6.00 AM in the timezone "Europe/Berlin".
wait_for_parent_task = ExternalTaskSensor(
task_id='wait_for_parent_task ',
external_dag_id='test_parent',
external_task_id='task_parent_2',
check_existence=True,
# execution_date needs to be exact (scheduled time) and the london timezone
# Remember: The scheduled start is always the one step further in the past -
# For a daily schedule: - datetime.timedelta(days=1)
execution_date_fn=lambda dt: (datetime.datetime(year=dt.year, month=dt.month, day=dt.day, tzinfo=local_tz)
+ datetime.timedelta(hours=6, minutes=0)
- datetime.timedelta(days=1)
).astimezone(local_tz_london),
)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Lazloo Xp |
