'Trigger Airflow DAG from Python source code, cannot find Sqlite table "dag"
I want to invoke an Airflow DAG definition from within a Python script (a Jupyter notebook actually).
I made sure the environment variables that I use for the airflow command are set (e.g. AIRFLOW_HOME) with python-dotenv to load a .env file inside the Jupyter notebook. This .env file contains various environment variables included AIRFLOW_HOME:
%load_ext dotenv
# the AIRFLOW_HOME env var must be an absolute file system path
# if the Notebook is relative to where the `airflow.db` Sqlite DB is.
%dotenv ../path-to-env-file/.env
I do see the expected file system path, so AIRFLOW_HOME points to the place where the Sqlite database airflow.db is - the Jupyter environment prints the correct path:
print(os.getenv("AIRFLOW_HOME"))
The Sqlite DB was previously created with commands like airflow db init (same old AIRFLOW_HOME), also I added some connections I use in the DAG with airflow connections add ....
All this stuff works either via the Airflow Webserver + scheduler or via the command:
airflow tasks test \
--subdir ../dags \
'my_dag_id' 'my_task_id' "${TODAY}"
- All these configuration details did go from the
airflowcommand invocations into the Sqlite DB. - I do see all the Airflow DB tables in that Sqlite DB.
- I do see the rows in those tables.
- All the commands work as expected.
Now I want to do the same as above, but with a DAG defined in a Jupyter notebook, so going back to that, when I do this in Python:
from airflow.api.client.local_client import Client
c = Client(None, None)
c.trigger_dag(dag_id='my_dag_id', run_id='test_run_id', conf={})
Then I see a long stacktrace with at the beginning this message:
OperationalError Traceback (most recent call last)
and at the bottom this message:
OperationalError: (sqlite3.OperationalError) no such table: dag
[SQL: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root_dag_id, dag.is_paused AS dag_is_paused, dag.is_subdag AS dag_is_subdag, dag.is_active AS dag_is_active, dag.last_parsed_time AS dag_last_parsed_time, dag.last_pickled AS dag_last_pickled, dag.last_expired AS dag_last_expired, dag.scheduler_lock AS dag_scheduler_lock, dag.pickle_id AS dag_pickle_id, dag.fileloc AS dag_fileloc, dag.owners AS dag_owners, dag.description AS dag_description, dag.default_view AS dag_default_view, dag.schedule_interval AS dag_schedule_interval, dag.max_active_tasks AS dag_max_active_tasks, dag.max_active_runs AS dag_max_active_runs, dag.has_task_concurrency_limits AS dag_has_task_concurrency_limits, dag.has_import_errors AS dag_has_import_errors, dag.next_dagrun AS dag_next_dagrun, dag.next_dagrun_data_interval_start AS dag_next_dagrun_data_interval_start, dag.next_dagrun_data_interval_end AS dag_next_dagrun_data_interval_end, dag.next_dagrun_create_after AS dag_next_dagrun_create_after
FROM dag
WHERE dag.dag_id = ?
LIMIT ? OFFSET ?]
[parameters: ('my_dag_id', 1, 0)]
But the dag table is there, AIRFLOW_HOME is pointing to that Sqlite DB and the table contains the expected row:
-- this shows the expected row
SELECT * FROM dag WHERE dag_id = 'my_dag_id'
How do I run the DAG programmatically from Python or Jupyter?
Solution 1:[1]
First off, can you run the DAG yourself through the Airflow UI? It sounds like the Airflow DB may not be initialized correctly. Try a reset, running through the "normal" patterns, then moving forward.
Note: The command to initialize the DB is airflow initdb, not airflow db init.
Second, I'd recommend using the Airflow API to trigger your DAGs through Python rather than using the local client, or by using the CLI.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Thom Bedford |
