'Launch DAG based on when table is updated
I have a 10-15 step DAG process that I need to launch after one bigquery table is updated for the current day. The table can be checked with a simple select SQL statement:
select * from sample_table where update_date = current_date()
When that query returns values, then we need to launch a series of tasks in Airflow/Cloud composer.
Solution 1:[1]
Create another DAG with a sensor node which checks the bigquery table for updates for the current day.
Where the table was updated, use the TriggerDagRunOperator to trigger your dag.
You can run this supervisor dag on schedule if the update to your big query table is not predictable
from typing import Optional, Union, Sequence
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryValueCheckOperator
)
from airflow.utils.context import Context
class BigQueryTableSensor(BaseSensorOperator, BigQueryCheckOperator):
def __init__(
self,
*,
sql: str,
gcp_conn_id: str = 'google_cloud_default',
use_legacy_sql: bool = True,
location: Optional[str] = None,
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
labels: Optional[dict] = None,
):
super().__init__(**kwargs) # initialize base sensor
BigQueryCheckOperator.__init__(
self,
sql,
gcp_conn_id,
use_legacy_sql,
location,
impersonation_chain,
labels,
)
def poke(self, context: 'Context') -> bool:
return BigQueryCheckOperator.execute(self, context)
# dag initialisation...
with DAG(
dag_id="dag_process_supervisor",
start_date=pendulum.datetime(2022, 05, 25, tz="UTC"),
catchup=True,
schedule_interval="@hourly",
) as dag:
wait_for_table_update = BigQueryTableSensor(
sql = '''
SELECT COUNT(*) > 1
FROM sample_table
WHERE update_date = current_date();
''',
pass_value=True,
poke_interval=60 * 15, # 15 mins
mode='poke'
)
trigger_dag_process = TriggerDagRunOperator(
task_id="trigger_dag_process",
trigger_dag_id="dag_process",
)
wait_for_table_update >> trigger_dag_process
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
