'GoogleCloudStorageToBigQueryOperator source_objects to receive list via XCom

I would like to pass a list of strings, containing the name of files in google storage to XCom. Later to be picked up by a GoogleCloudStorageToBigQueryOperator task. The source_objects field is templated, so that Jinja templating can be used. Unfortunately, Jinja can only return a string, and thus I cannot pass the list in XCom.

How can I use a XCom list in GoogleCloudStorageToBigQueryOperator?

Reference to a similar question, solved by using provide_context: Pass a list of strings as parameter of a dependant task in Airflow

The closest solution I've found, which works, is to create a wrapper class and sending the id of the task who posted the xcom like so:

@apply_defaults
def __init__(self, source_objects_task_id,
....
def execute(self, context):
    source_objects = context['ti']
          .xcom_pull(task_ids=self.source_objects_task_id)
    operator = GoogleCloudStorageToBigQueryOperator(
          source_objects=source_objects,
          dag=self.dag,
....
)

    operator.execute(context)


Solution 1:[1]

Not sure how you get the list of Google Cloud Storage objects but if you are doing it using GoogleCloudStorageListOperator then you can instead pass wildcards to source_objects params in GoogleCloudStorageToBigQueryOperator in the same way that you do in BigQuery Web UI:

GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='test_bucket',
    source_objects=['folder1/*.csv', 'folder2/*.csv'],
    destination_project_dataset_table='dest_table',
    schema_object='gs://test-bucket/schema.json',
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    bigquery_conn_id='bq-conn',
    google_cloud_storage_conn_id='gcp-conn',
    dag=dag
)

If you want to get a list from other task using xcom, you can create a new operator or an Airflow plugin for GoogleCloudStorageToBigQueryOperator adding a new param source_objects_task_id, removing source_objects param and just replace the following code (Line 203 and 204: https://github.com/apache/incubator-airflow/blob/ac9033db0981ae1f770a8bdb5597055751ab15bd/airflow/contrib/operators/gcs_to_bq.py#L203-L204 ):

source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
               for source_object in self.source_objects]

with

source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
               for source_object in context['ti'].xcom_pull(task_ids=self.source_objects_task_id)]

and use it as follows:

GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='test_bucket',
    source_objects_task_id='task-id-of-previos-task',
    destination_project_dataset_table='dest_table',
    schema_object='gs://test-bucket/schema.json',
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    bigquery_conn_id='bq-conn',
    google_cloud_storage_conn_id='gcp-conn',
    dag=dag
)

Solution 2:[2]

Starting from 2.1.0 the Airflow added the ability to render XCOM output as native Python objects.

Set the render_template_as_native_obj=True in your DAG constructor:

dag = DAG(
    ...
    render_template_as_native_obj=True,
)

Because the render_template_as_native_obj works for the PythonOperator only (let me know if I am wrong, I tested on other operators and nothing works) we need to wrap our operator into PythonOperator:

PythonOperator(dag=dag, task_id='any', python_callable=_import, provide_context=True)

where the python callback function extracts the source objects from XCOM and executes the GCS operator:

def _import(**kwargs):
    ti = kwargs["ti"]

    op = GCSToBigQueryOperator(
        ...
        source_objects=ti.xcom_pull(task_ids="task-id-of-previos-task"),
        ...
 
    op.execute(kwargs)

Because the GoogleCloudStorageToBigQueryOperator is deprecated I used the GCSToBigQueryOperator.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 kaxil
Solution 2 Dmytro Maslenko