'Dynamic task creation using decorators in Airflow 2
I would like to create tasks dynamically in a flow based on the result from another task. Is it somehow possible to do it in the following way ?
I am able to do that by storing data from task in a temporary file and in the next task read it - but I would like to solve this problem using only airflow resources.
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
@task
def extract() -> dict:
return {"1001": 301.27, "1002": 433.21, "1003": 502.22}
def load_inner(dict_value_fun):
print("Value is: {}".format(str(dict_value_fun)))
default_args = {
'start_date': days_ago(2),
'email_on_failure': False,
'depends_on_past': False,
'owner': 'airflow',
'retries': 1,
}
@dag(default_args=default_args, schedule_interval=None,)
def tutorial_taskflow_api_etl():
order_data = extract()
for key in order_data:
@task(task_id='task_key_is_{}'.format(key))
def load(dict_value: dict, key: str):
load_inner(dict_value[key])
order_data >> load(order_data, key)
tutorial_etl_dag = tutorial_taskflow_api_etl()
Error message:
The key (dict_key_is_{{ task_instance.xcom_pull(task_ids='extract', dag_id='tutorial_taskflow_api_etl', key='0') }}) has to be made of alphanumeric characters, dashes, dots and underscores exclusively
Solution 1:[1]
I found out that creating dynamic task using airflow resources is not possible because DAG is not a regular Python file that runs in one system process. Airflow environment usually works with a few workers so it can happen that tasks are executed independently on different servers. To be able to create tasks dynamically we have to use external resources like GCS, database or Airflow Variables.
I had to solve my problem using Airflow Variables: You can see the code here:
from airflow.models import Variable
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
import logging
import json
@task
def extract() -> dict:
return {"1001": 301.27, "1002": 433.21, "1003": 502.22}
@task
def set_variables(val: dict):
# before running dag we have to set a variable
variable_val = Variable.get("dynamic_dictionary_variable_value")
logging.info("current variable value (before change) is " + str(variable_val))
logging.info("updated value " + str(val))
# we have to change dict type into string otherwise we get error about wrong quotation
Variable.set("dynamic_dictionary_variable_value", json.dumps(val))
variable_val = Variable.get("dynamic_dictionary_variable_value")
logging.info("current variable value (after change) is " + str(variable_val))
default_args = {
'start_date': days_ago(2),
'email_on_failure': False,
'depends_on_past': False,
'owner': 'airflow',
'retries': 1,
}
@dag(default_args=default_args,schedule_interval=None, start_date=days_ago(2))
def dynamic_workflow():
current_directory = Variable.get("dynamic_dictionary_variable_value")
logging.info("(before) current_directory outside task is " + str(current_directory))
values_directory = extract()
return_val = set_variables(values_directory)
current_directory = Variable.get("dynamic_dictionary_variable_value")
logging.info("(after) current_directory outside task is " + str(current_directory))
for key in json.loads(current_directory):
@task(task_id='dynamic_task_{}'.format(key))
def load(key_name: str, val: float):
logging.info("key_name: {}, value: {}".format(key_name, val))
return_val >> load(key, json.loads(current_directory)[key])
dynamic_workflow_dag = dynamic_workflow()
Before running this dag we have to remember to set Airflow Variable dynamic_dictionary_variable_value = {}. In my example I use {"50": 50.505}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |


