'Launch DAG based on when table is updated

I have a 10-15 step DAG process that I need to launch after one bigquery table is updated for the current day. The table can be checked with a simple select SQL statement:

select * from sample_table where update_date = current_date()

When that query returns values, then we need to launch a series of tasks in Airflow/Cloud composer.



Solution 1:[1]

Create another DAG with a sensor node which checks the bigquery table for updates for the current day.

Where the table was updated, use the TriggerDagRunOperator to trigger your dag.

You can run this supervisor dag on schedule if the update to your big query table is not predictable

from typing import Optional, Union, Sequence

from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryValueCheckOperator
)
from airflow.utils.context import Context

class BigQueryTableSensor(BaseSensorOperator, BigQueryCheckOperator):
    def __init__(
        self,
        *,
        sql: str,
        gcp_conn_id: str = 'google_cloud_default',
        use_legacy_sql: bool = True,
        location: Optional[str] = None,
        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
        labels: Optional[dict] = None,
    ):
        super().__init__(**kwargs) # initialize base sensor
        BigQueryCheckOperator.__init__(
            self,
            sql,
            gcp_conn_id,
            use_legacy_sql,
            location,
            impersonation_chain,
            labels,
        )
         
    def poke(self, context: 'Context') -> bool:
        return BigQueryCheckOperator.execute(self, context)


# dag initialisation...
with DAG(
    dag_id="dag_process_supervisor",
    start_date=pendulum.datetime(2022, 05, 25, tz="UTC"),
    catchup=True,
    schedule_interval="@hourly",
) as dag:
    wait_for_table_update = BigQueryTableSensor(
       sql = '''
          SELECT COUNT(*) > 1
             FROM sample_table 
          WHERE update_date = current_date();
       ''',
       pass_value=True,
       poke_interval=60 * 15, # 15 mins
       mode='poke'
    )
    trigger_dag_process = TriggerDagRunOperator(
        task_id="trigger_dag_process",
        trigger_dag_id="dag_process",
    )
    wait_for_table_update >> trigger_dag_process

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1