'Airflow - how run the same dag using different connection id

I am asking for your help. I have an existing dag that cleans up airflow_db from our production airflow db. Now I need to adjust the code and add another connection to our test environment, I was advised to create a for cycle like

for conn_id in {"prod_db", "test_db"}:
    with DAG( ... conn_id = conn_id, ...) 

But I am not sure how to do that really, as I did the change the syntax is incorrect now. Do you have any example/hint how to do that?

for conn_id in {"prod_db","test_db"}:
with   DAG(
        conn_id = conn_id,
        dag_id="airflow_db_cleanup",
        default_args={
            "owner": "our team,
            "email": mail_mail_mail,
            "email_on_failure": True,
            "email_on_retry": False,
            "start_date": datetime(2022, 1, 1, 0, 0, 0),
            "retries": 1,
            "retry_delay": timedelta(minutes=1),
        },
        schedule_interval=timedelta(days=1),
        max_active_runs=1,
) as dag,

dag.doc_md = doc

etc-then various functions that create sessions follow.

The operator here:

def get_max_db_entry_age_in_days(dag_run_conf):
    max_db_entry_age_in_days = None
    default_max_db_entry_age_in_days = int(Variable.get("max_db_entry_age_in_days", 90))

    if dag_run_conf:
        max_db_entry_age_in_days = dag_run_conf.get("maxDBEntryAgeInDays", None)
    logging.info("maxDBEntryAgeInDays from dag_run.conf: " + str(dag_run_conf))

    if max_db_entry_age_in_days is None:
        logging.info(
            "maxDBEntryAgeInDays conf variable isn't included. Using Default '"
            + str(default_max_db_entry_age_in_days)
            + "'"
        )
        max_db_entry_age_in_days = default_max_db_entry_age_in_days

    return max_db_entry_age_in_days


def cleanup_function(**context):
    dag_run = context.get("dag_run")
    dag_run_conf = getattr(dag_run, "conf", None)
    max_db_entry_age_in_days = get_max_db_entry_age_in_days(dag_run_conf)
    execution_date = context.get("execution_date")
    max_date = execution_date + timedelta(-max_db_entry_age_in_days)
    dry_run = Variable.get("dry_run", "True") == "True"

    airflow_db_model = context["params"].get("airflow_db_model")
    age_check_column = context["params"].get("age_check_column")
    keep_last_run = context["params"].get("keep_last_run")
    dag_id = context["params"].get("dag_id")
    cascade_delete = context["params"].get("cascade_delete")

    with create_session(conn_id) as session:
        logging.info("Configurations:")
        logging.info("max_date:                 " + str(max_date))
        logging.info("session:                  " + str(session))
        logging.info("airflow_db_model:         " + str(airflow_db_model))
        logging.info("age_check_column:         " + str(age_check_column))
        logging.info("keep_last_run:            " + str(keep_last_run))
        logging.info("dag_id:                   " + str(dag_id))
        logging.info("")

        logging.info("Running Cleanup Process...")
        query = get_main_query(
            session, airflow_db_model, age_check_column, keep_last_run, dag_id, max_date
        )
        entries_to_delete_count = query.count()

        logging.info(f"Query : {query}")
        logging.info(
            f"Will be deleting {entries_to_delete_count} {airflow_db_model.__name__}"
        )
        logging.info(list(query.values(dag_id)))

        if cascade_delete:
            for child in cascade_delete:
                child_model = child.get("child_model")
                child_id = child.get("child_id")
                parent_id = child.get("parent_id")

                delete_query = get_cascade_delete_query(
                    session, query, child_model, child_id, parent_id
                )
                child_rows = delete_query.count()
                logging.info(
                    f"Going to delete {child_rows} rows of {child_model.__name__}."
                )
                logging.info(list(delete_query.values(child_id)))

                if not dry_run:
                    delete_query.delete(synchronize_session=False)

        if dry_run:
            logging.info("Dry run finished")
        else:
            logging.info("Performing main delete...")
            query.delete(synchronize_session=False)
            session.commit()
            logging.info("Cleanup process finished")


for db_object in DATABASE_OBJECTS:

    cleanup_op = PythonOperator(
        task_id="cleanup_" + str(db_object["airflow_db_model"].__name__),
        python_callable=cleanup_function,
        params=db_object,
        provide_context=True,
        dag=dag,
    )


def remove_inactive_dags_from_ui(**context):
    max_db_age_inactive_dags = int(Variable.get("max_db_age_inactive_dags", 7))
    dry_run = Variable.get("dry_run", "True") == "True"

    threshold_date = context["execution_date"].subtract(days=max_db_age_inactive_dags)
    with create_session(conn_id) as session:
        query = (
            session.query(DagModel)
                .filter(
                and_(
                    DagModel.is_active.is_(False),
                    DagModel.is_subdag.is_(False),
                    DagModel.last_parsed_time <= threshold_date,
                    )
            )
                .options(load_only(DagModel.dag_id))
        )
        inactive_dags = query.all()
        logging.info(f"Will be deleted {query.count()} dags")

        for inactive_dag in inactive_dags:
            logging.info(f"Deleting dag {inactive_dag.dag_id} from UI")
            if not dry_run:
                delete_dag(inactive_dag.dag_id, session=session)

        if dry_run:
            logging.info("Dry run finished")
        else:
            session.commit()
            logging.info("Cleanup process finished")


remove_inactive_dags_task = PythonOperator(
    task_id="remove_inactive_dags_from_ui",
    python_callable=remove_inactive_dags_from_ui,
    provide_context=True,
    dag=dag,
)


Solution 1:[1]

Creating a loop of DAGs or Tasks is bad practice inside Airflow, I'm not sure if it's even possible.

The correct approach would be to have two tasks that run the same DAG code, but passing a different connection through each time. You could even parallel these tasks up so both DBs are cleared down at the same time rather than one after another.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Thom Bedford