'Airflow LocalFilesystemToGCSOperator marks the task with success but the file is not uploaded
I'm trying to upload a file from my local machine to GCS and I'm using the LocalFilesystemToGCSOperator. I'm following this howto https://airflow.readthedocs.io/en/latest/howto/operator/google/transfer/local_to_gcs.html#prerequisite-tasks. I've set up a connection to GCP with a path to a json file. This is the DAG code:
import os
from airflow import models
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.utils import dates
BUCKET_NAME = 'bucket-name'
PATH_TO_UPLOAD_FILE = '...path-to/airflow/dags/example-text.txt'
DESTINATION_FILE_LOCATION = '/test-dir-input/example-text.txt'
with models.DAG(
'example_local_to_gcs',
default_args=dict(start_date=dates.days_ago(1)),
schedule_interval=None,
) as dag:
upload_file = LocalFilesystemToGCSOperator(
gcp_conn_id='custom_gcp_connection',
task_id="upload_file",
src=PATH_TO_UPLOAD_FILE,
dst=DESTINATION_FILE_LOCATION,
bucket=BUCKET_NAME,
mime_type='text/plain'
)
When I trigger the DAG it is marked as a success but the file is not in the bucket
Solution 1:[1]
It looks like there's a problem with your path_to_upload and destination_file_location.
To give you an idea, here's a separate post that could also help you. The relevant parameters similar to yours were declared like this for example:
src='/Users/john/Documents/tmp',
dst='gs://constantine-bucket',
bucket='constantine-bucket',
You should remove the ... and make sure that the destination_file_location refers to your bucket name or the folder inside it like this:
BUCKET_NAME = 'bucket-name'
PATH_TO_UPLOAD_FILE = '/path-to/airflow/dags/example-text.txt'
DESTINATION_FILE_LOCATION = 'gs://bucket-name/example-text.txt'
# Or in a folder on your bucket
# DESTINATION_FILE_LOCATION = 'gs://bucket-name/folder/example-text.txt'
Solution 2:[2]
The following code did the trick for me.
Please note that the service account used must have storage.objects permissions in the destination-bucket to write the file.
import os
import datetime
from pathlib import Path
from airflow import DAG
from airflow.configuration import conf
from airflow.operators.dummy import DummyOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
comp_home_path = Path(conf.get("core", "dags_folder")).parent.absolute()
comp_bucket_path = "data/uploaded" # <- if your file is within a folder
comp_local_path = os.path.join(comp_home_path, comp_bucket_path)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime.today(),
'end_date': None,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=1)
}
sch_interval = None
dag = DAG(
'mv_local_to_GCS',
default_args=default_args,
tags=["example"],
catchup=False,
schedule_interval=sch_interval
)
mv_local_gcs = LocalFilesystemToGCSOperator(
task_id="local_to_gcs",
src=comp_local_path+"/yourfilename.csv",# PATH_TO_UPLOAD_FILE
dst="somefolder/yournewfilename.csv",# BUCKET_FILE_LOCATION
bucket="yourproject",#using NO 'gs://' nor '/' at the end, only the project, folders, if any, in dst
dag=dag
)
start = DummyOperator(task_id='Starting', dag=dag)
start >> mv_local_gcs
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 |

