'Airflow - Conditional task execution
I'm running Airflow 2.1.0 in a Ubuntu 20.04 and I've detected an optimization possibility in a dag and I need help figuring out how can I improve it. Thank you in advance.
I'm running a dag to execute two kinds of tasks:
- Raw layer = Queries databases and record files in an S3 bucket
- Trusted layer = Reads from the S3 raw bucket, and records in the S3 trusted layer bucket
I need to allow the Trusted task of each task group to execute right after the Raw task ends, but, when that happens, I also need to trigger the Raw task of the next Task Group. Because since the Trusted layer won't use the database, I can optimize the dag execution time by allowing it to already start the next Raw task always keeping the database busy. I also can't run simultaneous queries, so I can only run a single Raw task at a time.
Actual flow: (Not optimized, Raw tasks idle while they could already have started)
- Task 1 = Raw starts
- Task 1 = Raw ends
- Task 2 = Trusted starts
- Task 2 = Trusted ends
- ...
Expected flow: (Optimized, making sure Raw tasks are up at all times)
- Task 1 = Raw starts
- Task 1 = Raw ends
- (Task 1 = Trusted Starts) + (Task 2 = Raw Starts)
- Task 1 = Trusted ends
- Task 2 = Raw ends
- (Task 2 = Trusted Starts) + (Task 3 = Raw Starts)
- ...
This is the dag code below:
from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from airflow_pentaho.operators.KitchenOperator import KitchenOperator
from airflow_pentaho.operators.PanOperator import PanOperator
from airflow_pentaho.operators.CarteJobOperator import CarteJobOperator
from airflow_pentaho.operators.CarteTransOperator import CarteTransOperator
from airflow.utils.task_group import TaskGroup
local_tz = pendulum.timezone('America/Sao_Paulo')
DAG_NAME = "Example_Dag"
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': pendulum.datetime(2022, 4, 13, tz=local_tz),
'email': '[email protected]',
'retries': 3,
'retry_delay': timedelta(minutes=1)
}
with DAG(dag_id=DAG_NAME,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=1),
catchup=False,
concurrency=2,
max_active_runs=1,
schedule_interval='*/10 * * * *') as dag:
with TaskGroup(group_id='table1') as table1:
job1 = CarteJobOperator(
dag=dag,
task_id="tsk_raw1",
job="Example/raw_layer")
job2 = CarteJobOperator(
dag=dag,
task_id="tsk_trusted1",
job="Example/trusted_layer")
job1 >> job2
with TaskGroup(group_id='table2') as table2:
job3 = CarteJobOperator(
dag=dag,
task_id="tsk_raw2",
job="Example/raw_layer")
job4 = CarteJobOperator(
dag=dag,
task_id="tsk_trusted2",
job="Example/trusted_layer")
job3 >> job4
with TaskGroup(group_id='table3') as table3:
job5 = CarteJobOperator(
dag=dag,
task_id="tsk_raw3",
job="Example/raw_layer")
job6 = CarteJobOperator(
dag=dag,
task_id="tsk_trusted3",
job="Example/trusted_layer")
job5 >> job6
with TaskGroup(group_id='table4') as table4:
job7 = CarteJobOperator(
dag=dag,
task_id="tsk_raw4",
job="Example/raw_layer")
job8 = CarteJobOperator(
dag=dag,
task_id="tsk_trusted4",
job="Example/trusted_layer")
job7 >> job8
with TaskGroup(group_id='table5') as table5:
job9 = CarteJobOperator(
dag=dag,
task_id="tsk_raw5",
job="Example/raw_layer")
job10 = CarteJobOperator(
dag=dag,
task_id="tsk_trusted5",
job="Example/trusted_layer")
job9 >> job10
table1 >> table2 >> table3 >> table4 >> table5
Solution 1:[1]
Your task groups are getting in the way of your optimal flow. I recommend laying out all your tasks individually and then just using the >> operator to show the actual dependencies. If, on the resulting graph, there is a group of tasks that makes sense as a task group, only then should you add it formally as a task group.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Collin McNulty |
