'How I can use Airflow template reference in the DAG python code

I am new in the Airflow world and trying to understand one thing. For example I have a DAG that contains 2 tasks. The first task is submitting spark job, and the second one is Sensor that waits for a file in s3.

RUN_DATE_ARG = datetime.utcnow().strftime(DATE_FORMAT_PY)
DATE = datetime.strptime(RUN_DATE_ARG, DATE_FORMAT_PY) - timedelta(hours=1)
with DAG() as dag:


submit_spark_job = EmrContainerOperator(
    task_id="start_job",
    virtual_cluster_id=VIRTUAL_CLUSTER_ID,
    execution_role_arn=JOB_ROLE_ARN,
    release_label="emr-6.3.0-latest",
    job_driver=JOB_DRIVER_ARG,
    configuration_overrides=CONFIGURATION_OVERRIDES_ARG,
    name=f"spark-{RUN_DATE_ARG}",
    retries=3
)

validate_s3_success_file = S3KeySensor(
    task_id='check_for_success_file',
    bucket_name="bucket-name",
    bucket_key=f"blabla/date={DATE.strftime('%Y-%m-%d')}/hour={DATE.strftime('%H')}/_SUCCESS",
    poke_interval=10,
    timeout=60,
    verify=False,
)

I have a RUN_DATE_ARG that by default should be taken from datetime.utcnow() and this is one of sparks java arguments that I should provide to my job. I want to add an ability to submit job with custom date argument (via airflow UI). When I am trying to retrieve it as '{{ dag_run.conf["date"] | None}}' it replaces with value inside task configuration (bucket_key=f"blabla/date={DATE.strftime('%Y-%m-%d')}/hour={DATE.strftime('%H')}/_SUCCESS",), but not for DAG's python code if I do following:

date='{{ dag_run.conf["date"] | None}}'
if date is None:
  RUN_DATE_ARG = datetime.utcnow().strftime(DATE_FORMAT_PY)
else: 
  RUN_DATE_ARG = date

Do I have any way to use this value as a code variable?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source