'Retry Airflow task instance only on certain Exception

What's the best way to retry an Airflow operator only for certain failures/exceptions?

For example, let's assume that I have an Airflow task which relies on the availability of an external service. If this service becomes unavailable during the task execution, I would like to retry later (max 3 retries). For other failures I do not want to retry.

My current approach is to use the on_failure_callback and manipulate context["ti"].task.retries on the desired exception by parsing context["exception"], but I consider this as messy and hard to understand. Are there better options?



Solution 1:[1]

Most of airflow's operators use a Hook class to complete the work.

If you can create your own PythonOperator and try/catch the exceptions you want to avoid and throw the exceptions you want to trigger the retry it will comply with airflow architecture seamlessly:

# python operator function
def my_operation():
    try:
        hook = SomeHook()
        hook.use_it()
    catch IgnorableException e:
        pass


# then:
my_operator = PythonOperator(
    task_id='my-operator',
    python_callable=my_operation
)

It gives you more control over your Operator and DAG life-cycle.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Alan Borsato