'Airflow job returns successfully. Deployed dataflow job nowhere to be found

So when I run the job locally using jar, it deploys and finishes successfully i.e. I can see the output files in GCS

java -cp /Users/zainqasmi/Workspace/vasa/dataflow/build/libs/vasa-dataflow-2022-03-25-12-27-14-784-all.jar com.nianticproject.geodata.extraction.ExtractGeodata \
--project=vasa-dev \
--configurationPath=/Users/zainqasmi/Workspace/vasa/dataflow/src/main/resources/foursquare/extract.pb.txt \
--region=us-central1 \
--runner=DataflowRunner \
--dryRun=false \
--workerMachineType=n2d-highmem-16

However, when I push the dag to airflow, it apparently runs successfully i.e. Marking task as SUCCESS and return code 0. But I can't find the dataflow being executed anywhere in GCP UI. Am I missing something? Using environment composer-2-0-7-airflow-2-2-3 Logs from airflow:

*** Reading remote log from gs://us-central1-airflow-dev-b0cc30af-bucket/logs/foursquare_1/extract_geodata/2022-03-25T22:52:15.382542+00:00/1.log.
    [2022-03-25, 22:52:21 UTC] {taskinstance.py:1033} INFO - Dependencies all met for <TaskInstance: foursquare_1.extract_geodata manual__2022-03-25T22:52:15.382542+00:00 [queued]>
    [2022-03-25, 22:52:21 UTC] {taskinstance.py:1033} INFO - Dependencies all met for <TaskInstance: foursquare_1.extract_geodata manual__2022-03-25T22:52:15.382542+00:00 [queued]>
    [2022-03-25, 22:52:21 UTC] {taskinstance.py:1239} INFO - 
    --------------------------------------------------------------------------------
    [2022-03-25, 22:52:21 UTC] {taskinstance.py:1240} INFO - Starting attempt 1 of 2
    [2022-03-25, 22:52:21 UTC] {taskinstance.py:1241} INFO - 
    --------------------------------------------------------------------------------
    [2022-03-25, 22:52:21 UTC] {taskinstance.py:1260} INFO - Executing <Task(DataFlowJavaOperator): extract_geodata> on 2022-03-25 22:52:15.382542+00:00
    [2022-03-25, 22:52:21 UTC] {standard_task_runner.py:52} INFO - Started process 57323 to run task
    [2022-03-25, 22:52:21 UTC] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'foursquare_1', 'extract_geodata', 'manual__2022-03-25T22:52:15.382542+00:00', '--job-id', '1531', '--raw', '--subdir', 'DAGS_FOLDER/dataflow_operator_test.py', '--cfg-path', '/tmp/tmp4thgd6do', '--error-file', '/tmp/tmpu6crkval']
    [2022-03-25, 22:52:21 UTC] {standard_task_runner.py:77} INFO - Job 1531: Subtask extract_geodata
    [2022-03-25, 22:52:22 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: foursquare_1.extract_geodata manual__2022-03-25T22:52:15.382542+00:00 [running]> on host airflow-worker-9rz89
    [2022-03-25, 22:52:22 UTC] {taskinstance.py:1426} INFO - Exporting the following env vars:
    AIRFLOW_CTX_DAG_OWNER=airflow
    AIRFLOW_CTX_DAG_ID=foursquare_1
    AIRFLOW_CTX_TASK_ID=extract_geodata
    AIRFLOW_CTX_EXECUTION_DATE=2022-03-25T22:52:15.382542+00:00
    AIRFLOW_CTX_DAG_RUN_ID=manual__2022-03-25T22:52:15.382542+00:00
    [2022-03-25, 22:52:22 UTC] {credentials_provider.py:312} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
    [2022-03-25, 22:52:22 UTC] {taskinstance.py:1268} INFO - Marking task as SUCCESS. dag_id=foursquare_1, task_id=extract_geodata, execution_date=20220325T225215, start_date=20220325T225221, end_date=20220325T225222
    [2022-03-25, 22:52:22 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
    [2022-03-25, 22:52:22 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

Dag:

GCP_PROJECT = "vasa-dev"
CONNECTION_ID = 'bigquery_default'

VASA_DATAFLOW_JAR = '/home/airflow/gcs/data/bin/vasa-dataflow-2022-03-25-16-36-09-008-all.jar'


default_args = {
    'owner': 'airflow',
    'depends_on_past': True,
    'wait_for_downstream' : True,
    'max_active_runs' : 1,
    'start_date': days_ago(1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(days=1),
}


with DAG(
    dag_id = 'foursquare_1',
    schedule_interval=timedelta(days=1),
    default_args=default_args
     ) as dag:

     kick_off_dag = DummyOperator(task_id='run_this_first')

     extract_geodata =  DataFlowJavaOperator(
         task_id='extract_geodata',
         jar=VASA_DATAFLOW_JAR,
         job_class='com.nianticproject.geodata.extraction.ExtractGeodata',
        options= {
          "project": "vasa-dev",
          "configurationPath": "/home/airflow/gcs/foursquare/extract.pb.txt",
          "region": "us-central1",
          "runner": "DataflowRunner",
          "dryRun": "false",
          "workerMachineType":"n2d-highmem-16",
        },
        dag=dag)

     end_task =  BashOperator(
              task_id='end_task',
              bash_command='echo {{ execution_date.subtract(months=1).replace(day=1).strftime("%Y-%m-%d") }}',
              dag=dag,
              )


     kick_off_dag >> extract_geodata >> end_task


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source