'Proper way to create dynamic workflows in Airflow

Problem

Is there any way in Airflow to create a workflow such that the number of tasks B.* is unknown until completion of Task A? I have looked at subdags but it looks like it can only work with a static set of tasks that have to be determined at Dag creation.

Would dag triggers work? And if so could you please provide an example.

I have an issue where it is impossible to know the number of task B's that will be needed to calculate Task C until Task A has been completed. Each Task B.* will take several hours to compute and cannot be combined.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

Idea #1

I don't like this solution because I have to create a blocking ExternalTaskSensor and all the Task B.* will take between 2-24 hours to complete. So I do not consider this a viable solution. Surely there is an easier way? Or was Airflow not designed for this?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

Edit 1:

As of now this question still does not have a great answer. I have been contacted by several people looking for a solution.



Solution 1:[1]

Here is how I did it with a similar request without any subdags:

First create a method that returns whatever values you want

def values_function():
     return values

Next create method that will generate the jobs dynamically:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)

And then combine them:

push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete

Solution 2:[2]

Yes this is possible I've created an example DAG that demonstrates this.

import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator

main_dag_id = 'DynamicWorkflow2'

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True
}

dag = DAG(
    main_dag_id,
    schedule_interval="@once",
    default_args=args)


def start(*args, **kwargs):

    value = Variable.get("DynamicWorkflow_Group1")
    logging.info("Current DynamicWorkflow_Group1 value is " + str(value))


def resetTasksStatus(task_id, execution_date):
    logging.info("Resetting: " + task_id + " " + execution_date)

    dag_folder = conf.get('core', 'DAGS_FOLDER')
    dagbag = DagBag(dag_folder)
    check_dag = dagbag.dags[main_dag_id]
    session = settings.Session()

    my_task = check_dag.get_task(task_id)
    ti = TaskInstance(my_task, execution_date)
    state = ti.current_state()
    logging.info("Current state of " + task_id + " is " + str(state))
    ti.set_state(None, session)
    state = ti.current_state()
    logging.info("Updated state of " + task_id + " is " + str(state))


def bridge1(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 2

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))


def bridge2(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 3

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))


def end(*args, **kwargs):
    logging.info("Ending")


def doSomeWork(name, index, *args, **kwargs):
    # Do whatever work you need to do
    # Here I will just create a new file
    os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')


starting_task = PythonOperator(
    task_id='start',
    dag=dag,
    provide_context=True,
    python_callable=start,
    op_args=[])

# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
    task_id='bridge1',
    dag=dag,
    provide_context=True,
    python_callable=bridge1,
    op_args=[])

DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))

for index in range(int(DynamicWorkflow_Group1)):
    dynamicTask = PythonOperator(
        task_id='firstGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['firstGroup', index])

    starting_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge1_task)

# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
    task_id='bridge2',
    dag=dag,
    provide_context=True,
    python_callable=bridge2,
    op_args=[])

DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))

for index in range(int(DynamicWorkflow_Group2)):
    dynamicTask = PythonOperator(
        task_id='secondGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['secondGroup', index])

    bridge1_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge2_task)

ending_task = PythonOperator(
    task_id='end',
    dag=dag,
    provide_context=True,
    python_callable=end,
    op_args=[])

DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))

for index in range(int(DynamicWorkflow_Group3)):

    # You can make this logic anything you'd like
    # I chose to use the PythonOperator for all tasks
    # except the last task will use the BashOperator
    if index < (int(DynamicWorkflow_Group3) - 1):
        dynamicTask = PythonOperator(
            task_id='thirdGroup_' + str(index),
            dag=dag,
            provide_context=True,
            python_callable=doSomeWork,
            op_args=['thirdGroup', index])
    else:
        dynamicTask = BashOperator(
            task_id='thirdGroup_' + str(index),
            bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
            dag=dag)

    bridge2_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(ending_task)

# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)

Before you run the DAG create these three Airflow Variables

airflow variables --set DynamicWorkflow_Group1 1

airflow variables --set DynamicWorkflow_Group2 0

airflow variables --set DynamicWorkflow_Group3 0

You'll see that the DAG goes from this

enter image description here

To this after it's ran

enter image description here

You can see more information on this DAG in my article on creating Dynamic Workflows On Airflow.

Solution 3:[3]

I have figured out a way to create workflows based on the result of previous tasks.
Basically what you want to do is have two subdags with the following:

  1. Xcom push a list (or what ever you need to create the dynamic workflow later) in the subdag that gets executed first (see test1.py def return_list())
  2. Pass the main dag object as a parameter to your second subdag
  3. Now if you have the main dag object, you can use it to get a list of its task instances. From that list of task instances, you can filter out a task of the current run by using parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]), one could probably add more filters here.
  4. With that task instance, you can use xcom pull to get the value you need by specifying the dag_id to the one of the first subdag: dag_id='%s.%s' % (parent_dag_name, 'test1')
  5. Use the list/value to create your tasks dynamically

Now I have tested this in my local airflow installation and it works fine. I don't know if the xcom pull part will have any problems if there is more than one instance of the dag running at the same time, but then you'd probably either use a unique key or something like that to uniquely identify the xcom value you want. One could probably optimize the 3. step to be 100% sure to get a specific task of the current main dag, but for my use this performs well enough, I think one only needs one task_instance object to use xcom_pull.

Also I clean the xcoms for the first subdag before every execution, just to make sure that I don't accidentally get any wrong value.

I'm pretty bad at explaining, so I hope the following code will make everything clear:

test1.py

from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator

log = logging.getLogger(__name__)


def test1(parent_dag_name, start_date, schedule_interval):
    dag = DAG(
        '%s.test1' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date,
    )

    def return_list():
        return ['test1', 'test2']

    list_extract_folder = PythonOperator(
        task_id='list',
        dag=dag,
        python_callable=return_list
    )

    clean_xcoms = PostgresOperator(
        task_id='clean_xcoms',
        postgres_conn_id='airflow_db',
        sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
        dag=dag)

    clean_xcoms >> list_extract_folder

    return dag

test2.py

from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator

log = logging.getLogger(__name__)


def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
    dag = DAG(
        '%s.test2' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date
    )

    if len(parent_dag.get_active_runs()) > 0:
        test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
            dag_id='%s.%s' % (parent_dag_name, 'test1'),
            task_ids='list')
        if test_list:
            for i in test_list:
                test = DummyOperator(
                    task_id=i,
                    dag=dag
                )

    return dag

and the main workflow:

test.py

from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2

DAG_NAME = 'test-dag'

dag = DAG(DAG_NAME,
          description='Test workflow',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 24))

test1 = SubDagOperator(
    subdag=test1(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval),
    task_id='test1',
    dag=dag
)

test2 = SubDagOperator(
    subdag=test2(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval,
                 parent_dag=dag),
    task_id='test2',
    dag=dag
)

test1 >> test2

Solution 4:[4]

OA: "Is there any way in Airflow to create a workflow such that the number of tasks B.* is unknown until completion of Task A?"

Short answer is no. Airflow will build the DAG flow before starting to running it.

That said we came to a simple conclusion, that is we don't have such needing. When you want to parallelize some work you should evaluate the resources you have available and not the number of items to process.

We did it like this: we dynamically generate a fixed number of tasks, say 10, that will split the job. For example if we need to process 100 files each task will process 10 of them. I will post the code later today.

Update

Here is the code, sorry for the delay.

from datetime import datetime, timedelta

import airflow
from airflow.operators.dummy_operator import DummyOperator

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 8),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

dag = airflow.DAG(
    'parallel_tasks_v1',
    schedule_interval="@daily",
    catchup=False,
    default_args=args)

# You can read this from variables
parallel_tasks_total_number = 10

start_task = DummyOperator(
    task_id='start_task',
    dag=dag
)


# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
    return DummyOperator(
        provide_context=True,
        task_id='parallel_task_' + str(current_task_number),
        python_callable=parallelTask,
        # your task will take as input the total number and the current number to elaborate a chunk of total elements
        op_args=[current_task_number, int(parallel_tasks_total_number)],
        dag=dag)


end = DummyOperator(
    task_id='end',
    dag=dag)

for page in range(int(parallel_tasks_total_number)):
    created_task = create_dynamic_task(page)
    start_task >> created_task
    created_task >> end

Code explanation:

Here we have a single start task and a single end task (both dummy).

Then from the start task with the for loop we create 10 tasks with the same python callable. The tasks are created in the function create_dynamic_task.

To each python callable we pass as arguments the total number of parallel tasks and the current task index.

Suppose you have 1000 items to elaborate: the first task will receive in input that it should elaborate the first chunk out of 10 chunks. It will divide the 1000 items into 10 chunks and elaborate the first one.

Solution 5:[5]

What I think your are looking for is creating DAG dynamically I encountered this type of situation few days ago after some search I found this blog.

Dynamic Task Generation

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task

Setting the DAG workflow

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # Use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

This is how our DAG looks like after putting the code together enter image description here

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id=task_id,
        provide_context=True,
        # Eval is used since the callableFunction var is of type string
        # while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable=eval(callableFunction),
        op_kwargs=args,
        xcom_push=True,
        dag=dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

It was very help full hope It will also help some one else

Solution 6:[6]

Only for v2.3 and above:

This feature is achieved using Dynamic Task Mapping, only for Airflow versions 2.3 and higher

More documentation and example here:

Example:

@task
def make_list():
    # This can also be from an API call, checking a database, -- almost anything you like, as long as the
    # resulting list/dictionary can be stored in the current XCom backend.
    return [1, 2, {"a": "b"}, "str"]


@task
def consumer(arg):
    print(list(arg))


with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
    consumer.expand(arg=make_list())

example 2:

from airflow import XComArg

task = MyOperator(task_id="source")

downstream = MyOperator2.partial(task_id="consumer").expand(input=XComArg(task))

The graph view and tree view are also updated:

  • graph view
  • tree view

Relevant issues here:

Solution 7:[7]

The jobs graph is not generated at run time. Rather the graph is built when it is picked up by Airflow from your dags folder. Therefore it isn't really going to be possible to have a different graph for the job every time it runs. You can configure a job to build a graph based on a query at load time. That graph will remain the same for every run after that, which is probably not very useful.

You can design a graph which executes different tasks on every run based on query results by using a Branch Operator.

What I've done is to pre-configure a set of tasks and then take the query results and distribute them across the tasks. This is probably better anyhow because if your query returns a lot of results, you probably don't want to flood the scheduler with a lot of concurrent tasks anyhow. To be even safer, I also used a pool to ensure my concurrency doesn't get out of hand with an unexpectedly large query.

"""
 - This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask

########################################################################

default_args = {
    'owner': 'airflow',
    'catchup': False,
    'depends_on_past': False,
    'start_date': datetime(2019, 7, 2, 19, 50, 00),
    'email': ['rotten@stackoverflow'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'max_active_runs': 1
}

dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)

totalBuckets = 5

get_orders_query = """
select 
    o.id,
    o.customer
from 
    orders o
where
    o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
    and
    o.is_test = false
    and
    o.is_processed = false
"""

###########################################################################################################

# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
    return PythonOperator( 
                           task_id=f'order_processing_task_{bucket_number}',
                           python_callable=runOrderProcessing,
                           pool='order_processing_pool',
                           op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
                           provide_context=True,
                           dag=dag
                          )


# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
    orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)

    if orderList is not None:
        for order in orderList:
            logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
            doStuff(**op_kwargs)


# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
    myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')

    # initialize the task list buckets
    tasks = {}
    for task_number in range(0, totalBuckets):
        tasks[f'order_processing_task_{task_number}'] = []

    # populate the task list buckets
    # distribute them evenly across the set of buckets
    resultCounter = 0
    for record in myDatabaseHook.get_records(get_orders_query):

        resultCounter += 1
        bucket = (resultCounter % totalBuckets)

        tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})

    # push the order lists into xcom
    for task in tasks:
        if len(tasks[task]) > 0:
            logging.info(f'Task {task} has {len(tasks[task])} orders.')
            context['ti'].xcom_push(key=task, value=tasks[task])
        else:
            # if we didn't have enough tasks for every bucket
            # don't bother running that task - remove it from the list
            logging.info(f"Task {task} doesn't have any orders.")
            del(tasks[task])

    return list(tasks.keys())

###################################################################################################


# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
    task_id='clean_xcoms',
    mysql_conn_id='airflow_db',
    sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
    dag=dag)


# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
                                 task_id='get_orders',
                                 python_callable=getOpenOrders,
                                 provide_context=True,
                                 dag=dag
                                )
get_orders_task.set_upstream(clean_xcoms)

# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
    taskBucket = createOrderProcessingTask(bucketNumber)
    taskBucket.set_upstream(get_orders_task)


###################################################################################################

Solution 8:[8]

A great answer

Too much? Anyway.

A lot of the other answers are a bit square-peg-round-hole. Adding complicated new operators, abusing built in variables, or somewhat failing to answer the question. I wasn't particularly happy with any of them, as they either hide their behaviour when viewed through the web UI, are prone to breaking, or require a lot of custom code (that's also prone to breaking).

This solution uses built in functionality, requires no new operators and limited aditional code, the DAGs are visible through the UI without any tricks, and follows airflow best practice (see idempotency).

The solution to this problem is fairly complicated, so I've split it into several parts. These are:

  • How to safely trigger a dynamic number of tasks
  • How to wait for all of these tasks to finish then call a final task
  • How to integrate this into your task pipeline
  • Limitations (nothing is perfect)

Can a task trigger a dynamic number of other tasks?

Yes. Sortof. Without needing to write any new operators, it's possible to have a DAG trigger a dynamic number of other DAGs, using only builtin operators. This can then be expanded to have a DAG depend on a dynamic number of other DAGs (see waiting for tasks to finish). This is similar to flinz's solution, but more robust and with much less custom code.

This is done using a BranchPythonOperator that selectively triggers 2 other TriggerDagRunOperators. One of these recursively re-calls the current DAG, the other calls an external dag, the target function.

An example config that can be used to trigger the dag is given at the top of recursive_dag.py.

print_conf.py (an example DAG to trigger)

from datetime import timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def print_output(dag_run):
    dag_conf = dag_run.conf
    if 'output' in dag_conf:
        output = dag_conf['output']
    else:
        output = 'no output found'
    print(output)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'print_output',
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as dag:
    print_output = PythonOperator(
        task_id='print_output_task',
        python_callable=print_output
    )

recursive_dag.py (Where the magic happens)

"""
DAG that can be used to trigger multiple other dags.
For example, trigger with the following config:
{
    "task_list": ["print_output","print_output"],
    "conf_list": [
        {
            "output": "Hello"
        },
        {
            "output": "world!"
        }
    ]
}
"""

from datetime import timedelta
import json

from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

dag_id = 'branch_recursive'
branch_id = 'branch_operator'
repeat_task_id = 'repeat_dag_operator'
repeat_task_conf = repeat_task_id + '_conf'
next_task_id = 'next_dag_operator'
next_task_conf = next_task_id + '_conf'

def choose_branch(task_instance, dag_run):
    dag_conf = dag_run.conf
    task_list = dag_conf['task_list']
    next_task = task_list[0]
    later_tasks = task_list[1:]
    conf_list = dag_conf['conf_list']
    # dump to string because value is stringified into
    # template string, is then parsed.
    next_conf = json.dumps(conf_list[0])
    later_confs = conf_list[1:]

    task_instance.xcom_push(key=next_task_id, value=next_task)
    task_instance.xcom_push(key=next_task_conf, value=next_conf)

    if later_tasks:
        repeat_conf = json.dumps({
            'task_list': later_tasks,
            'conf_list': later_confs
        })

        task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
        return [next_task_id, repeat_task_id]

    return next_task_id

def add_braces(in_string):
    return '{{' + in_string + '}}'

def make_templated_pull(key):
    pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{branch_id}\')'
    return add_braces(pull)

with DAG(
    dag_id,
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as dag:
    branch = BranchPythonOperator(
        task_id=branch_id,
        python_callable=choose_branch
    )

    trigger_next = TriggerDagRunOperator(
        task_id=next_task_id,
        trigger_dag_id=make_templated_pull(next_task_id),
        conf=make_templated_pull(next_task_conf)
    )

    trigger_repeat = TriggerDagRunOperator(
        task_id=repeat_task_id,
        trigger_dag_id=dag_id,
        conf=make_templated_pull(repeat_task_conf)
    )

    branch >> [trigger_next, trigger_repeat]

This solution has the advantage of using very limited custom code. flinz's solution can fail part way through, resulting in some scheduled tasks and others not. Then on retry, DAGS may either be scheduled to run twice, or fail on the first dag resulting in partially complete work done by a failed task. This approach will tell you which DAGs have failed to trigger, and retry only the DAGs that failed to trigger. Therefore this approach is idempotent, the other isn't.

Can a DAG depend on a dynamic number of other DAGS?

Yes, but... This can be easily done if tasks don't run in parallel. Running in parallel is more complicated.

To run in sequence, the important changes are using wait_for_completion=True in trigger_next, use a python operator to setup the xcom values before "trigger_next", and adding a branch operator that either enables or disables the repeat task, then having a linear dependence

setup_xcom >> trigger_next >> branch >> trigger_repeat

To run in parallel, you can similarily recursively chain several ExternalTaskSensors that use templated external_dag_id values, and the timestamps associated with the triggered dag runs. To get the triggered dag timestamp, you can trigger a dag using the timestamp of the triggering dag. Then these sensors one by one wait for all of the created DAGs to complete, then trigger a final DAG. Code below, this time I've added a random sleep to the print output DAG, so that the wait dags actually do some waiting.

Note: recurse_wait_dag.py now defines 2 dags, both need to be enabled for this all to work.

An example config that can be used to trigger the dag is given at the top of recurse_wait_dag.py

print_conf.py (modified to add a random sleep)

"""
Simple dag that prints the output in DAG config
Used to demo TriggerDagRunOperator (see recursive_dag.py)
"""

from datetime import timedelta
from time import sleep
from random import randint

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

def print_output(dag_run):
    sleep_time = randint(15,30)
    print(f'sleeping for time: {sleep_time}')
    sleep(sleep_time)
    dag_conf = dag_run.conf
    if 'output' in dag_conf:
        output = dag_conf['output']
    else:
        output = 'no output found'
    print(output)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'print_output',
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as dag:
    print_output = PythonOperator(
        task_id='print_output_task',
        python_callable=print_output
    )

recurse_wait_dag.py (where even more magic happens)

"""
DAG that can be used to trigger multiple other dags,
waits for all dags to execute, then triggers a final dag.
For example, trigger the DAG 'recurse_then_wait' with the following config:
{
    "final_task": "print_output",
    "task_list": ["print_output","print_output"],
    "conf_list": [
        {
            "output": "Hello"
        },
        {
            "output": "world!"
        }
    ]
}
"""


from datetime import timedelta
import json

from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils import timezone

from common import make_templated_pull

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

def to_conf(id):
    return f'{id}_conf'

def to_execution_date(id):
    return f'{id}_execution_date'

def to_ts(id):
    return f'{id}_ts'

recurse_dag_id = 'recurse_then_wait'
branch_id = 'recursive_branch'
repeat_task_id = 'repeat_dag_operator'
repeat_task_conf = to_conf(repeat_task_id)
next_task_id = 'next_dag_operator'
next_task_conf = to_conf(next_task_id)
next_task_execution_date = to_execution_date(next_task_id)
end_task_id = 'end_task'
end_task_conf = to_conf(end_task_id)

wait_dag_id = 'wait_after_recurse'
choose_wait_id = 'choose_wait'
next_wait_id = 'next_wait'
next_wait_ts = to_ts(next_wait_id)

def choose_branch(task_instance, dag_run, ts):
    dag_conf = dag_run.conf
    task_list = dag_conf['task_list']
    next_task = task_list[0]
    # can't have multiple dag runs of same DAG with same timestamp 
    assert next_task != recurse_dag_id
    later_tasks = task_list[1:]
    conf_list = dag_conf['conf_list']
    next_conf = json.dumps(conf_list[0])
    later_confs = conf_list[1:]
    triggered_tasks = dag_conf.get('triggered_tasks', []) + [(next_task, ts)]

    task_instance.xcom_push(key=next_task_id, value=next_task)
    task_instance.xcom_push(key=next_task_conf, value=next_conf)
    task_instance.xcom_push(key=next_task_execution_date, value=ts)

    if later_tasks:
        repeat_conf = json.dumps({
            'task_list': later_tasks,
            'conf_list': later_confs,
            'triggered_tasks': triggered_tasks,
            'final_task': dag_conf['final_task']
        })

        task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
        return [next_task_id, repeat_task_id]
    
    end_conf = json.dumps({
        'tasks_to_wait': triggered_tasks,
        'final_task': dag_conf['final_task']
    })
    task_instance.xcom_push(key=end_task_conf, value=end_conf)

    return [next_task_id, end_task_id]

def choose_wait_target(task_instance, dag_run):
    dag_conf = dag_run.conf
    tasks_to_wait = dag_conf['tasks_to_wait']
    next_task, next_ts = tasks_to_wait[0]
    later_tasks = tasks_to_wait[1:]
    task_instance.xcom_push(key=next_wait_id, value=next_task)
    task_instance.xcom_push(key=next_wait_ts, value=next_ts)
    
    if later_tasks:
        repeat_conf = json.dumps({
            'tasks_to_wait': later_tasks,
            'final_task': dag_conf['final_task']
        })
        task_instance.xcom_push(key=repeat_task_conf, value=repeat_conf)
        
def execution_date_fn(_, task_instance):
    date_str = task_instance.xcom_pull(key=next_wait_ts, task_ids=choose_wait_id)
    return timezone.parse(date_str)

def choose_wait_branch(task_instance, dag_run):
    dag_conf = dag_run.conf
    tasks_to_wait = dag_conf['tasks_to_wait']

    if len(tasks_to_wait) == 1:
        return end_task_id

    return repeat_task_id

with DAG(
    recurse_dag_id,
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as recursive_dag:
    branch = BranchPythonOperator(
        task_id=branch_id,
        python_callable=choose_branch
    )

    trigger_next = TriggerDagRunOperator(
        task_id=next_task_id,
        trigger_dag_id=make_templated_pull(next_task_id, branch_id),
        execution_date=make_templated_pull(next_task_execution_date, branch_id),
        conf=make_templated_pull(next_task_conf, branch_id)
    )

    trigger_repeat = TriggerDagRunOperator(
        task_id=repeat_task_id,
        trigger_dag_id=recurse_dag_id,
        conf=make_templated_pull(repeat_task_conf, branch_id)
    )

    trigger_end = TriggerDagRunOperator(
        task_id=end_task_id,
        trigger_dag_id=wait_dag_id,
        conf=make_templated_pull(end_task_conf, branch_id)
    )

    branch >> [trigger_next, trigger_repeat, trigger_end]

with DAG(
    wait_dag_id,
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as wait_dag:
    py_operator = PythonOperator(
        task_id=choose_wait_id,
        python_callable=choose_wait_target
    )

    sensor = ExternalTaskSensor(
        task_id='do_wait',
        external_dag_id=make_templated_pull(next_wait_id, choose_wait_id),
        execution_date_fn=execution_date_fn
    )

    branch = BranchPythonOperator(
        task_id=branch_id,
        python_callable=choose_wait_branch
    )

    trigger_repeat = TriggerDagRunOperator(
        task_id=repeat_task_id,
        trigger_dag_id=wait_dag_id,
        conf=make_templated_pull(repeat_task_conf, choose_wait_id)
    )

    trigger_end = TriggerDagRunOperator(
        task_id=end_task_id,
        trigger_dag_id='{{ dag_run.conf[\'final_task\'] }}'
    )

    py_operator >> sensor >> branch >> [trigger_repeat, trigger_end]

Integrating with your code

That's great, but you want to actually use this. So, what do you need to do? The question includes an example trying to do the following:

             |---> Task B.1 --|
             |---> Task B.2 --|
Task A ------|---> Task B.3 --|-----> Task C
             |       ....     |
             |---> Task B.N --|

To achieve the question goal (example implementation below), you need to separate Tasks A, B and C into their own DAG. Then, in DAG A add a new operator at the end that triggers the above DAG 'recurse_then_wait'. Pass into this dag a config that includes the config needed for each B DAG, as well as the B dag id (this can be easily changed to use different dags, go nuts). Then include the name of DAG C, the final DAG, to be run at the end. This config should look like this:

{
    "final_task": "C_DAG",
    "task_list": ["B_DAG","B_DAG"],
    "conf_list": [
        {
            "b_number": 1,
            "more_stuff": "goes_here"
        },
        {
            "b_number": 2,
            "foo": "bar"
        }
    ]
}

When implemented it should look something like this:

trigger_recurse.py

from datetime import timedelta
import json

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.dates import days_ago

from recurse_wait_dag import recurse_dag_id

def add_braces(in_string):
    return '{{' + in_string + '}}'

def make_templated_pull(key, task_id):
    pull = f'ti.xcom_pull(key=\'{key}\', task_ids=\'{task_id}\')'
    return add_braces(pull)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

setup_trigger_conf_id = 'setup_trigger_conf'
trigger_conf_key = 'trigger_conf'

def setup_trigger_conf(task_instance):
    trigger_conf = {
        'final_task': 'print_output',
        'task_list': ['print_output','print_output'],
        'conf_list': [
            {
                'output': 'Hello'
            },
            {
                'output': 'world!'
            }
        ]
    }

    print('Triggering the following tasks')
    for task, conf in zip(trigger_conf['task_list'], trigger_conf['conf_list']):
        print(f'    task: {task} with config {json.dumps(conf)}')
    print(f'then waiting for completion before triggering {trigger_conf["final_task"]}')

    task_instance.xcom_push(key=trigger_conf_key, value=json.dumps(trigger_conf))

with DAG(
    'trigger_recurse_example',
    start_date=days_ago(2),
    tags=['my_test'],
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval=None
) as dag:
    py_operator = PythonOperator(
        task_id=setup_trigger_conf_id,
        python_callable=setup_trigger_conf
    )

    trigger_operator = TriggerDagRunOperator(
        task_id='trigger_call_and_wait',
        trigger_dag_id=recurse_dag_id,
        conf=make_templated_pull(trigger_conf_key, setup_trigger_conf_id)
    )

    py_operator >> trigger_operator

All of this ends up looking something like the below, with vertical and horizontal lines to show where a DAG triggers another DAG:

A
|
Recurse - B.1
|
Recurse - B.2
|
...
|
Recurse - B.N
|
Wait for B.1
|
Wait for B.2
|
...
|
Wait for B.N
|
C

Limitations

Tasks are no longer visible on a single graph. This is probably the biggest problem with this approach. By adding tags to all associated DAGs, the DAGs can at least be viewed together. However relating multiple parallel runs of DAG B to runs of DAG A is messy. However, as a single DAG run shows its input conf, this means that each DAG B run doesn't depend on DAG A, only on it's input config. Therefore this relation can be at least partially ignored.

Tasks can no longer communicate using xcom. The B tasks can receive input from task A via DAG config, however task C can't get output from the B tasks. The results of all the B tasks should be put into a known location then read by task C.

The config argument to 'recurse_and_wait' could maybe be improved to combine task_list and conf_list, but this solves the problem as stated.

There's no config for the final DAG. That should be trivial to solve.

Solution 9:[9]

I think I have found a nicer solution to this at https://github.com/mastak/airflow_multi_dagrun, which uses simple enqueuing of DagRuns by triggering multiple dagruns, similar to TriggerDagRuns. Most of the credits go to https://github.com/mastak, although I had to patch some details to make it work with the most recent airflow.

The solution uses a custom operator that triggers several DagRuns:

from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone


class TriggerMultiDagRunOperator(TriggerDagRunOperator):
    CREATED_DAGRUN_KEY = 'created_dagrun_key'

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None,
                 *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):

        context.update(self.op_kwargs)
        session = settings.Session()
        created_dr_ids = []
        for dro in self.python_callable(*self.op_args, **context):
            if not dro:
                break
            if not isinstance(dro, DagRunOrder):
                dro = DagRunOrder(payload=dro)

            now = timezone.utcnow()
            if dro.run_id is None:
                dro.run_id = 'trig__' + now.isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                execution_date=now,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True,
            )
            created_dr_ids.append(dr.id)
            self.log.info("Created DagRun %s, %s", dr, now)

        if created_dr_ids:
            session.commit()
            context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
        else:
            self.log.info("No DagRun created")
        session.close()

You can then submit several dagruns from the callable function in your PythonOperator, for example:

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago


def generate_dag_run(**kwargs):
    for i in range(10):
        order = DagRunOrder(payload={'my_variable': i})
        yield order

args = {
    'start_date': days_ago(1),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='simple_trigger',
    max_active_runs=1,
    schedule_interval='@hourly',
    default_args=args,
)

gen_target_dag_run = TriggerMultiDagRunOperator(
    task_id='gen_target_dag_run',
    dag=dag,
    trigger_dag_id='common_target',
    python_callable=generate_dag_run
)

I created a fork with the code at https://github.com/flinz/airflow_multi_dagrun

Solution 10:[10]

Do not understand what the problem is?

Here is a standard example. Now if in function subdag replace for i in range(5): with for i in range(random.randint(0, 10)): then everything will work. Now imagine that operator 'start' puts the data in a file, and instead of a random value, the function will read this data. Then operator 'start' will affect the number of tasks.

The problem will only be in the display in the UI since when entering the subdag, the number of tasks will be equal to the last read from the file/database/XCom at the moment. Which automatically gives a restriction on several launches of one dag at one time.

Solution 11:[11]

Depending on the context, this could be implemented in an asynchronous batch workers style. "Dynamic tasks" can be treated as list of work items to be done and split into asynchronous messages published into external message broker queue for worker nodes to pick up.

One tasks generates "work" dynamically and publishes all items (we don't know in advance how many and even which exactly) into a topic/queue.

Workers consume "work tasks" from the queue. Either directly if implemented using external to Airflow technology, or as as Airflow Sensor task (maybe in a separate DAG). When they finish processing their task, the Airflow Sensor gets triggered and the execution flow continues.

To restore the flow for individual work items, think about using EIP Claim Check pattern.

Solution 12:[12]

Paradigm Shift

Based on all the answers here, it seems to me that the best approach is not to think of the dynamic "work list" generating code as an initial task, but rather as a pre-DAG definition computation.

This, of course, assumes that there is a single initial computation to be made only once and at the beginning each DAG run (as OP describes). This approach would not work if some halfway task must re-define the DAG, a pattern that airflow doesn't seem to be built for. However, consider chaining controller/target DAGs (see below).

Code sample:

from airflow.decorators import dag, task
from airflow.operators.dummy import DummyOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.dates import days_ago

DEFAULT_ARGS = {"owner": "airflow"}


def get_list_of_things(connection_id):
    list_all_the_things_sql = """
    SELECT * FROM things 
    """
    pg_hook = PostgresHook(postgres_conn_id=connection_id)
    connection = pg_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute(list_all_the_things_sql)  # NOTE: this will execute to build the DAG, so if you grock the code, expect the DAG not to load, unless you have a valid postgres DB with a table named "things" and with things in it.
    res = cursor.fetchall()
    return res


@dag(default_args=DEFAULT_ARGS, schedule_interval="@once", start_date=days_ago(2), dag_id='test_joey_dag')
def dynamicly_generated_dag():
    connection_id = "ProdDB"

    @task
    def do_a_thing(row):
        print(row)
        return row

    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')
    

    data_list = get_list_of_things(connection_id)
    for row in data_list:
        start >> do_a_thing(row) >> end


dag = dynamicly_generated_dag()

If the get_list_of_things() computation is long, then perhaps it would be prudent to pre-compute it and trigger this DAG externally with a controller/target pattern:
trigger_controller_dag
trigger_target_dag

Solution 13:[13]

I found this Medium post which is very similar to this question. However it is full of typos, and does not work when I tried implementing it.

My answer to the above is as follows:

If you are creating tasks dynamically you must do so by iterating over something which is not created by an upstream task, or can be defined independently of that task. I learned that you can't pass execution dates or other airflow variables to something outside of a template (e.g., a task) as many others have pointed out before. See also this post.