'How to display Airflow DAG status in Big Query tables
I want to show the DAG (airflow) final status (success/Failure) to a table in BQ. Like that table can contains: Date-Time,DAG-Name,Status etc columns and it will get populated according to the final status of the DAG.
Please help; how can this be achieved?
Solution 1:[1]
If you need the data in real-time, I would go with somethign along the lines of the approach @Bas has suggested, maybe with firestore or Cloud SQL. However note his comments on the inserts per day if you go with BigQuery.
If you can wait on the results on a daily basis you can do a log sink to BigQuery as described here: https://cloud.google.com/bigquery/docs/reference/auditlogs#stackdriver_logging_exports
In the filter criteria you can either bring in all of the Airflow logs or just the ones from the worker/scheduler.
Ex criteria:
resource.type="cloud_composer_environment"
logName="projects/{YOUR-PROJECT}/logs/airflow-worker"
In the log textPayload you will see something like:
Marking task as SUCCESS. dag_id=thing, task_id=stuff, executiondate=20220307T111111, start_date=20220307T114858, end_date=20220307T114859
You can then parse for what you need in BigQuery
Solution 2:[2]
To complement the answer of user Bas Harenslak. There are these options also that you can explore:
- You can make use of TriggerDagRunOperator. By using it you can have one dag (a
recap-dag) which will be referenced by your DAGs to populate the record into your destination dataset.
trigger_recap_dag = TriggerDagRunOperator(
task_id="trigger_recap_dag",
trigger_dag_id="recap-dag",
wait_for_completion=False,
allowed_states=['success']
conf='{"Time": datetime.now() ,"DAG": "recap-dag","Status":"success"}'
)
ingestion >> transformation >> save >> send_notification >> trigger_recap_dag
- If you see fit, This
recap-dagcan also be independent and only run every hour/day/week of your election and checks your DAGs status.
with DAG(
'recap-dag',
schedule_interval='@daily',
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
...
# Airflow >= 2.0.0
# Inside a python Operator
def GetRunningDagsInfo():
dag_runs = DagRun.find(
dag_id=your_dag_id,
execution_start_date=your_start_date
execution_end_date=your_end_date
)
...
- You can make use of prior options and come with a solution like this:
After you dag (or dags) complete, it will fire the trigger dag. this
recap-dagwill saves your dag records into a custom table or file and then your independent DAG runs and retrieves the datasets that have been created so far and push the data into your BigQuery Table.
- Another option is by looking into your Airflow Database to retrieve running information. Know as Data Profiling. It has been deprecated in latest versions due to security concerns.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Daniel Zagales |
| Solution 2 |
