'Airflow - how run the same dag using different connection id
I am asking for your help. I have an existing dag that cleans up airflow_db from our production airflow db. Now I need to adjust the code and add another connection to our test environment, I was advised to create a for cycle like
for conn_id in {"prod_db", "test_db"}:
with DAG( ... conn_id = conn_id, ...)
But I am not sure how to do that really, as I did the change the syntax is incorrect now. Do you have any example/hint how to do that?
for conn_id in {"prod_db","test_db"}:
with DAG(
conn_id = conn_id,
dag_id="airflow_db_cleanup",
default_args={
"owner": "our team,
"email": mail_mail_mail,
"email_on_failure": True,
"email_on_retry": False,
"start_date": datetime(2022, 1, 1, 0, 0, 0),
"retries": 1,
"retry_delay": timedelta(minutes=1),
},
schedule_interval=timedelta(days=1),
max_active_runs=1,
) as dag,
dag.doc_md = doc
etc-then various functions that create sessions follow.
The operator here:
def get_max_db_entry_age_in_days(dag_run_conf):
max_db_entry_age_in_days = None
default_max_db_entry_age_in_days = int(Variable.get("max_db_entry_age_in_days", 90))
if dag_run_conf:
max_db_entry_age_in_days = dag_run_conf.get("maxDBEntryAgeInDays", None)
logging.info("maxDBEntryAgeInDays from dag_run.conf: " + str(dag_run_conf))
if max_db_entry_age_in_days is None:
logging.info(
"maxDBEntryAgeInDays conf variable isn't included. Using Default '"
+ str(default_max_db_entry_age_in_days)
+ "'"
)
max_db_entry_age_in_days = default_max_db_entry_age_in_days
return max_db_entry_age_in_days
def cleanup_function(**context):
dag_run = context.get("dag_run")
dag_run_conf = getattr(dag_run, "conf", None)
max_db_entry_age_in_days = get_max_db_entry_age_in_days(dag_run_conf)
execution_date = context.get("execution_date")
max_date = execution_date + timedelta(-max_db_entry_age_in_days)
dry_run = Variable.get("dry_run", "True") == "True"
airflow_db_model = context["params"].get("airflow_db_model")
age_check_column = context["params"].get("age_check_column")
keep_last_run = context["params"].get("keep_last_run")
dag_id = context["params"].get("dag_id")
cascade_delete = context["params"].get("cascade_delete")
with create_session(conn_id) as session:
logging.info("Configurations:")
logging.info("max_date: " + str(max_date))
logging.info("session: " + str(session))
logging.info("airflow_db_model: " + str(airflow_db_model))
logging.info("age_check_column: " + str(age_check_column))
logging.info("keep_last_run: " + str(keep_last_run))
logging.info("dag_id: " + str(dag_id))
logging.info("")
logging.info("Running Cleanup Process...")
query = get_main_query(
session, airflow_db_model, age_check_column, keep_last_run, dag_id, max_date
)
entries_to_delete_count = query.count()
logging.info(f"Query : {query}")
logging.info(
f"Will be deleting {entries_to_delete_count} {airflow_db_model.__name__}"
)
logging.info(list(query.values(dag_id)))
if cascade_delete:
for child in cascade_delete:
child_model = child.get("child_model")
child_id = child.get("child_id")
parent_id = child.get("parent_id")
delete_query = get_cascade_delete_query(
session, query, child_model, child_id, parent_id
)
child_rows = delete_query.count()
logging.info(
f"Going to delete {child_rows} rows of {child_model.__name__}."
)
logging.info(list(delete_query.values(child_id)))
if not dry_run:
delete_query.delete(synchronize_session=False)
if dry_run:
logging.info("Dry run finished")
else:
logging.info("Performing main delete...")
query.delete(synchronize_session=False)
session.commit()
logging.info("Cleanup process finished")
for db_object in DATABASE_OBJECTS:
cleanup_op = PythonOperator(
task_id="cleanup_" + str(db_object["airflow_db_model"].__name__),
python_callable=cleanup_function,
params=db_object,
provide_context=True,
dag=dag,
)
def remove_inactive_dags_from_ui(**context):
max_db_age_inactive_dags = int(Variable.get("max_db_age_inactive_dags", 7))
dry_run = Variable.get("dry_run", "True") == "True"
threshold_date = context["execution_date"].subtract(days=max_db_age_inactive_dags)
with create_session(conn_id) as session:
query = (
session.query(DagModel)
.filter(
and_(
DagModel.is_active.is_(False),
DagModel.is_subdag.is_(False),
DagModel.last_parsed_time <= threshold_date,
)
)
.options(load_only(DagModel.dag_id))
)
inactive_dags = query.all()
logging.info(f"Will be deleted {query.count()} dags")
for inactive_dag in inactive_dags:
logging.info(f"Deleting dag {inactive_dag.dag_id} from UI")
if not dry_run:
delete_dag(inactive_dag.dag_id, session=session)
if dry_run:
logging.info("Dry run finished")
else:
session.commit()
logging.info("Cleanup process finished")
remove_inactive_dags_task = PythonOperator(
task_id="remove_inactive_dags_from_ui",
python_callable=remove_inactive_dags_from_ui,
provide_context=True,
dag=dag,
)
Solution 1:[1]
Creating a loop of DAGs or Tasks is bad practice inside Airflow, I'm not sure if it's even possible.
The correct approach would be to have two tasks that run the same DAG code, but passing a different connection through each time. You could even parallel these tasks up so both DBs are cleared down at the same time rather than one after another.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Thom Bedford |
