'Retry Airflow task instance only on certain Exception
What's the best way to retry an Airflow operator only for certain failures/exceptions?
For example, let's assume that I have an Airflow task which relies on the availability of an external service. If this service becomes unavailable during the task execution, I would like to retry later (max 3 retries). For other failures I do not want to retry.
My current approach is to use the on_failure_callback and manipulate context["ti"].task.retries on the desired exception by parsing context["exception"], but I consider this as messy and hard to understand. Are there better options?
Solution 1:[1]
Most of airflow's operators use a Hook class to complete the work.
If you can create your own PythonOperator and try/catch the exceptions you want to avoid and throw the exceptions you want to trigger the retry it will comply with airflow architecture seamlessly:
# python operator function
def my_operation():
try:
hook = SomeHook()
hook.use_it()
catch IgnorableException e:
pass
# then:
my_operator = PythonOperator(
task_id='my-operator',
python_callable=my_operation
)
It gives you more control over your Operator and DAG life-cycle.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Alan Borsato |
