'Airflow LocalFilesystemToGCSOperator marks the task with success but the file is not uploaded

I'm trying to upload a file from my local machine to GCS and I'm using the LocalFilesystemToGCSOperator. I'm following this howto https://airflow.readthedocs.io/en/latest/howto/operator/google/transfer/local_to_gcs.html#prerequisite-tasks. I've set up a connection to GCP with a path to a json file. This is the DAG code:

import os

from airflow import models
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.utils import dates

BUCKET_NAME = 'bucket-name'
PATH_TO_UPLOAD_FILE = '...path-to/airflow/dags/example-text.txt'
DESTINATION_FILE_LOCATION = '/test-dir-input/example-text.txt'

with models.DAG(
    'example_local_to_gcs',
    default_args=dict(start_date=dates.days_ago(1)),
    schedule_interval=None,
) as dag:
    upload_file = LocalFilesystemToGCSOperator(
        gcp_conn_id='custom_gcp_connection',
        task_id="upload_file",
        src=PATH_TO_UPLOAD_FILE,
        dst=DESTINATION_FILE_LOCATION,
        bucket=BUCKET_NAME,
        mime_type='text/plain'
    )

When I trigger the DAG it is marked as a success but the file is not in the bucket



Solution 1:[1]

It looks like there's a problem with your path_to_upload and destination_file_location.

To give you an idea, here's a separate post that could also help you. The relevant parameters similar to yours were declared like this for example:

src='/Users/john/Documents/tmp',
dst='gs://constantine-bucket',
bucket='constantine-bucket',

You should remove the ... and make sure that the destination_file_location refers to your bucket name or the folder inside it like this:

BUCKET_NAME = 'bucket-name'
PATH_TO_UPLOAD_FILE = '/path-to/airflow/dags/example-text.txt'
DESTINATION_FILE_LOCATION = 'gs://bucket-name/example-text.txt'

# Or in a folder on your bucket
# DESTINATION_FILE_LOCATION = 'gs://bucket-name/folder/example-text.txt'

Solution 2:[2]

The following code did the trick for me.

Please note that the service account used must have storage.objects permissions in the destination-bucket to write the file.

enter image description here

import os
import datetime
from pathlib import Path

from airflow import DAG
from airflow.configuration import conf
from airflow.operators.dummy import DummyOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator

comp_home_path = Path(conf.get("core", "dags_folder")).parent.absolute()
comp_bucket_path = "data/uploaded" # <- if your file is within a folder
comp_local_path = os.path.join(comp_home_path, comp_bucket_path)

default_args = {
   'owner': 'airflow',
   'depends_on_past': False,
   'start_date': datetime.datetime.today(),
   'end_date': None,
   'email': ['[email protected]'],
   'email_on_failure': True,
   'email_on_retry': True,
   'retries': 1,
   'retry_delay': datetime.timedelta(minutes=1)
   }

sch_interval = None

dag = DAG(
   'mv_local_to_GCS',
   default_args=default_args,
   tags=["example"],
   catchup=False,
   schedule_interval=sch_interval
   )

mv_local_gcs = LocalFilesystemToGCSOperator(
       task_id="local_to_gcs",
       src=comp_local_path+"/yourfilename.csv",# PATH_TO_UPLOAD_FILE
       dst="somefolder/yournewfilename.csv",# BUCKET_FILE_LOCATION
       bucket="yourproject",#using NO 'gs://' nor '/' at the end, only the project, folders, if any, in dst
       dag=dag
   )

start = DummyOperator(task_id='Starting', dag=dag)

start >> mv_local_gcs

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2