'How to have a mix of both Celery Executor and Kubernetes Executor in Apache Airflow?

I have multiple dags using Celery Executor but I want one particular dag to run using Kubernetes Executor. I am unable to deduce a good and reliable way to achieve this.

I have an airflow.cfg in which I have declared CeleryExecutor to be used. And I don't want to change it since it is really needed in all the dags but one.

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor

My dag code:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import \
    KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample_1', default_args=default_args)


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                                image="Python:3.6",
                                cmds=["Python", "-c"],
                                arguments=["print('hello world')"],
                                labels={"foo": "bar"},
                                name="passing-test",
                                task_id="passing-task",
                                get_logs=True,
                                dag=dag
                                )

failing = KubernetesPodOperator(namespace='default',
                                image="ubuntu:1604",
                                cmds=["Python", "-c"],
                                arguments=["print('hello world')"],
                                labels={"foo": "bar"},
                                name="fail",
                                task_id="failing-task",
                                get_logs=True,
                                dag=dag
                                )

passing.set_upstream(start)
failing.set_upstream(start)

I can put an if-else condition and then change the value from the point where Airflow picks up the configuration. If this sounds right, please tell me the paths and the files. Although I was hoping to get a more mature method, if it exists.



Solution 1:[1]

Now there is the CeleryKubernetesExecutor (can't see when it was exactly introduced), which requires to set up Celery and Kubernetes up, but also offers the functionalities from both.

In the official documentation, they offer a rule of thumb to decide when it's worth using it:

We recommend considering the CeleryKubernetesExecutor when your use case meets:

The number of tasks needed to be scheduled at the peak exceeds the scale that your Kubernetes cluster can comfortably handle

A relative small portion of your tasks requires runtime isolation.

You have plenty of small tasks that can be executed on Celery workers but you also have resource-hungry tasks that will be better to run in predefined environments.

Solution 2:[2]

Starting Airflow 2.x configure airflow.cfg as follows: In [core] section set executor = CeleryKubernetesExecutor and in [celery_kubernetes_executor] section set kubernetes_queue = kubernetes. So whenever you want to run a task instance in the kubernetes executor, add the parameter queue = kubernetes in the task definition. for eg.

task1= BashOperator(
        task_id='Test_kubernetes_executor',
        bash_command='echo Kubernetes',
        queue = 'kubernetes'
    )
task2 = BashOperator(
        task_id='Test_Celery_Executor',
        bash_command='echo Celery',
    )

On running the dag you will see task1 running in k8s and task2 in celery. Hence unless you write the queue as kubernetes, all dag will run on celery executor

Solution 3:[3]

I don't think it is possible to use both the executors. But you can just use CeleryExecutor, but declare resource intensive tasks with KubernetesPodOperator, and problem solved jobs are scheduled/watched by CeleryExecutor and ran by Kubernetes for actual processing logic in tasks.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Alessandro S.
Solution 2
Solution 3 Ashwin