'Using another connection instead of the default connection throughout the DAG

I'm using Airflow in Google Composer. By default all the tasks in a DAG use the default connection to communicate with Storage, BigQuery, etc. Obviously we can specify another connection configured in Airflow, ie:

task_custom = bigquery.BigQueryInsertJobOperator(
    task_id='task_custom_connection',
    gcp_conn_id='my_gcp_connection',
    configuration={
        "query": {
            "query": 'SELECT 1',
            "useLegacySql": False
        }
    }
)

Is it possible to use a specific connection as the default for all tasks in the entire DAG?

Thanks in advance.

UPDATE:

Specify gcp_conn_id via default_args in DAG (as Javier Lopez Tomas recommended) doesn't work completely. The Operators that expect gcp_conn_id as parameter works fine, but in my case unfortunately most of interactions with GCP components do so via clients or hooks within PythonOperators.

For example: If I call DataflowHook (inside a function called by a PythonOperator) without specifying the connection, it internally uses "google_cloud_default" and not "gcp_conn_id" :(

def _dummy_func(**context):
    df_hook = DataflowHook()

default_args = {
    'gcp_conn_id': 'my_gcp_connection'
}


with DAG(default_args=default_args) as dag:
    dummy = PythonOperator(python_callable=_dummy_func)


Solution 1:[1]

You can use default args: https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#default-arguments

In your case it would be:

default_args = {
    "gcp_conn_id": "my_gcp_connection"
}

with DAG(blabla)...

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Javier Lopez Tomas