'Airflow - Conditional task execution

I'm running Airflow 2.1.0 in a Ubuntu 20.04 and I've detected an optimization possibility in a dag and I need help figuring out how can I improve it. Thank you in advance.

I'm running a dag to execute two kinds of tasks:

  1. Raw layer = Queries databases and record files in an S3 bucket
  2. Trusted layer = Reads from the S3 raw bucket, and records in the S3 trusted layer bucket

I need to allow the Trusted task of each task group to execute right after the Raw task ends, but, when that happens, I also need to trigger the Raw task of the next Task Group. Because since the Trusted layer won't use the database, I can optimize the dag execution time by allowing it to already start the next Raw task always keeping the database busy. I also can't run simultaneous queries, so I can only run a single Raw task at a time.

Actual flow: (Not optimized, Raw tasks idle while they could already have started)

  1. Task 1 = Raw starts
  2. Task 1 = Raw ends
  3. Task 2 = Trusted starts
  4. Task 2 = Trusted ends
  5. ...

Expected flow: (Optimized, making sure Raw tasks are up at all times)

  1. Task 1 = Raw starts
  2. Task 1 = Raw ends
  3. (Task 1 = Trusted Starts) + (Task 2 = Raw Starts)
  4. Task 1 = Trusted ends
  5. Task 2 = Raw ends
  6. (Task 2 = Trusted Starts) + (Task 3 = Raw Starts)
  7. ...

This is the dag code below:

from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta

from airflow_pentaho.operators.KitchenOperator import KitchenOperator
from airflow_pentaho.operators.PanOperator import PanOperator
from airflow_pentaho.operators.CarteJobOperator import CarteJobOperator
from airflow_pentaho.operators.CarteTransOperator import CarteTransOperator
from airflow.utils.task_group import TaskGroup

local_tz = pendulum.timezone('America/Sao_Paulo')

DAG_NAME = "Example_Dag"
DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': pendulum.datetime(2022, 4, 13, tz=local_tz),
    'email': '[email protected]',
    'retries': 3,
    'retry_delay': timedelta(minutes=1)
}

with DAG(dag_id=DAG_NAME,
         default_args=DEFAULT_ARGS,
         dagrun_timeout=timedelta(hours=1),
         catchup=False,
         concurrency=2,
         max_active_runs=1,
         schedule_interval='*/10 * * * *') as dag:

  with TaskGroup(group_id='table1') as table1:

    job1 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw1",
        job="Example/raw_layer")

    job2 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted1",
        job="Example/trusted_layer")

    job1 >> job2

  with TaskGroup(group_id='table2') as table2:

    job3 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw2",
        job="Example/raw_layer")

    job4 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted2",
        job="Example/trusted_layer")

    job3 >> job4

  with TaskGroup(group_id='table3') as table3:

    job5 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw3",
        job="Example/raw_layer")

    job6 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted3",
        job="Example/trusted_layer")

    job5 >> job6

  with TaskGroup(group_id='table4') as table4:

    job7 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw4",
        job="Example/raw_layer")

    job8 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted4",
        job="Example/trusted_layer")

    job7 >> job8

  with TaskGroup(group_id='table5') as table5:
    job9 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw5",
        job="Example/raw_layer")

    job10 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted5",
        job="Example/trusted_layer")

    job9 >> job10



table1 >> table2 >> table3 >> table4 >> table5



Solution 1:[1]

Your task groups are getting in the way of your optimal flow. I recommend laying out all your tasks individually and then just using the >> operator to show the actual dependencies. If, on the resulting graph, there is a group of tasks that makes sense as a task group, only then should you add it formally as a task group.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Collin McNulty