'Airflow - Dynamic Tasks and Downstream Dependencies

I inherited the following dag which is running on AWS MWAA v2.2.2

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

default_args = {
    'owner': 'test',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email_on_failure': False,
    'email_on_retry': False,
}

snowflake_conn_id='snowflake'

sources = [
    {'name': 'test_1', 'path': 'test1/path/'},
    {'name': 'test_2', 'path': 'test2/path'}
 ]

# define the DAG
with DAG(
    'test_dag',
    default_args=default_args,
    description='test_dag test description.',
    schedule_interval=None,
    max_active_runs=1
) as dag:

    t0 = DummyOperator(task_id = 'start')

    for source in sources:
        create_table_sql = (
            f"CREATE OR REPLACE EXTERNAL TABLE {source['name']} with location = @stage_dev/{source['path']} auto_refresh = true FILE_FORMAT = (TYPE = PARQUET);"
            )

        external_tables_from_s3 = SnowflakeOperator(
            task_id=f"create_external_table_for_{source['name']}",
            dag=dag,
            sql=create_table_sql,
            snowflake_conn_id=snowflake_conn_id
        )

    t1 = DummyOperator(task_id = 'end')

    t0 >> external_tables_from_s3 >> t1

What is the best way to setupo this dag so that the external_tables_from_s3 tasks can be run in parallal

basically I want something like

to >> [create_external_table_for_test_1, create_external_table_for_test_2] >> t1

I was wondering what is the best way to achieve this without having to specify each task individually. The sources list is alot bigger than this and is just trimmed down for this question



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source