'Airflow tasks iterating over list should run sequentially
I am running a tasks based on a list. The task id gets incremented based on the list. After the completion of these tasks I want another tasks to execute . Following is the code:
with DAG('test',) as dag:
t1 = [PythonOperator(
task_id=f"task_hour_{hours}",
python_callable=hourly_job,
op_kwargs={
"hour": hours
}
) for hours in ['01', '02', '03']
]
t2 = PythonOperator(
task_id="daily",
python_callable=daily_job
)
t1 >> t2
What is happening is these hourly tasks are all running in parallel followed up by daily task for each one of them. Like this:
task_hour_01 >> daily
task_hour_02 >> daily
task_hour_03 >> daily
What i want to happen is that these hourly tasks should execute sequentially and lastly the daily task should execute:
task_hour_01 >> task_hour_02 >> task_hour_03 >> daily
So there are two problems:
- The tasks should run in sequence.
- The daily task should be the last task to execute and should run only once.
Solution 1:[1]
Based on this answer, you could use and auxiliar variable like t0 to initialize and handle your Operators.
with DAG('test',) as dag:
hours = ['01', '02', '03']
t0 = None
for hour in hours:
t1 = PythonOperator(
task_id=f"task_hour_{hours}",
python_callable=hourly_job,
op_kwargs={"hour": hour}
if t0 is not None:
t0 >> t1
t0 = t1
t2 = PythonOperator(
task_id="daily",
python_callable=daily_job
)
t1 >> t2
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Douglas Ferreira |
