'S3PrefixSensor in MWAA
I am using the S3PrefixSensor in MWAA v2.2.2, where I am waiting for some file to appear in S3 bucket. Here is my relevant code
s3_success_flag_sensor = S3PrefixSensor(
task_id='s3_success_flag',
bucket_name=MY_BUCKET,
prefix=MY_PREFIX,
retries=3,
dag=dag,
)
I would expect the DAG to see that the file is there, on S3 and continue to next operator, however this is what I see in logs (replaced real path and real s3 bucket name):
[2022-03-17, 09:30:03 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:31:04 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:32:06 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:33:07 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:34:08 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:35:09 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:36:10 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:37:12 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:38:13 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:39:14 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:40:15 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:41:16 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:42:17 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:43:18 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:44:19 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:45:21 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:46:22 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:47:23 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:48:24 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
[2022-03-17, 09:49:25 UTC] {{s3_prefix.py:77}} INFO - Poking for prefix : ['SOME_EXISTING_PATH_ON_S3'] in bucket s3://SOME_EXISTING_S3_BUCKET
The file is already there, however the sensor continues to poke.
Solution 1:[1]
S3PrefixSensor does not wait for a given prefix in S3 to exist but it waits for a given folder in S3 to exist (given the delimiter is /)
Since this is very confusing Airflow deprecated S3PrefixSensor in favor of S3keySensor (see PR)
For apache-airflow-providers-amazon>3.3.0:
Noting: By the time of writing this answer there is no version newer than 3.3.0 but it will be released in the next few weeks.
The usage is:
s3_sensor_key = S3KeySensor(
task_id="s3_sensor_key",
bucket_name=BUCKET_NAME,
bucket_key="my_k*",
wildcard_match=True,
)
This will search for prefix of my_k. It uses Unix wildcard pattern.
For apache-airflow-providers-amazon<=3.3.0:
Create a custom sensor by copy the S3KeySensor source code from Airflow main branch. Then import the class and use the example usage as shown above.
This will work assuming you are using Apache-Airflow>=2.1.0 if not then you will need to make further adjustments to the Sensor to make it compatible with older Airflow versions.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Elad Kalif |
