'Using airflow dag_run.conf inside custom operator

We created a custom airflow based on EMRContainerOperator and we need to take a decision based on a config passed using the airflow UI. enter image description here

My custom operator:

from airflow.providers.amazon.aws.operators.emr_containers import EMRContainerOperator
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence
from uuid import uuid4
from airflow.utils.decorators import apply_defaults

class EmrBatchProcessorOperator(EMRContainerOperator):

    template_fields: Sequence[str] = (
        "name",
        "virtual_cluster_id",
        "execution_role_arn",
        "release_label",
        "job_driver",
        "operation_type"
    )

    @apply_defaults
    def __init__(
            self,
            operation_type,
            *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.operation_type = operation_type

        
        
        
        if self.operation_type == 'full':
            number_of_pods=10
        else:
            number_of_pods=5

        BASE_CONSUMER_DRIVER_ARG = {
        "sparkSubmitJobDriver": {"entryPoint": "s3://bucket/batch_processor_engine/batch-processor-engine_2.12-3.0.1_0.28.jar","entryPointArguments": ["group_name=courier_api_group01"], "sparkSubmitParameters": f"--conf spark.executor.instances={ number_of_pods } --conf spark.executor.memory=32G --conf spark.executor.cores=5 --conf spark.driver.cores=1 --conf spark.driver.memory=12G --conf spark.sql.broadcastTimeout=2000 --class TableProcessorWrapper"} 
    }

        self.job_driver = BASE_CONSUMER_DRIVER_ARG

This is the way that I call my operator:

with DAG(
    dag_id="batch_processor_model_dag",
    schedule_interval="@daily",
    default_args=default_args,
    catchup=False
) as dag:
    start = DummyOperator(task_id='start', dag=dag)
    end = DummyOperator(task_id='end', dag=dag, trigger_rule='none_failed')




    base_consumer = EmrBatchProcessorOperator(
        task_id="base_consumer",
        virtual_cluster_id=VIRTUAL_CLUSTER_ID,
        execution_role_arn=JOB_ROLE_ARN,
        configuration_overrides=CONFIGURATION_OVERRIDES_ARG,
        release_label="emr-6.5.0-latest",
        job_driver={},
        name="pi.py",
        operation_type=  '{{dag_run.conf["operation_type"]}}'
    )

    

    start >> base_consumer >> end

But this code didn't work, I can't use the dag_run.conf value.

could you help me?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source