'Airflow - Error raised from sqlalchemy when try to run a TaskInstance

I'm testing the execution of a crawler inside of the airflow structure. If I ran the following script, everything works fine and the payloads is printed.

from airflow import DAG
from airflow.models import BaseOperator, TaskInstance
from hooks.crawler_hook import CrawlerHook

from datetime import datetime
import time

class CrawlerOperator(BaseOperator):
    def __init__(self, conn_id=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.conn_id = conn_id

    def execute(self):
        hook = CrawlerHook(conn_id=self.conn_id)
        print(hook.run())

if __name__ == "__main__":
    CrawlerOperator(task_id='test_run').execute()

But when I try to run a TaskInstance inside a DAG, I've got the an error and cannot understand why:

if __name__ == "__main__":
    with DAG(dag_id="DAG1", start_date=datetime.now(), catchup=False) as dag:
        to = CrawlerOperator(task_id="test_run")
        ti = TaskInstance(task=to)
        ti.run()

The error:

Traceback (most recent call last):
  File "/home/../.env/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/home/../.env/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1123, in get_dagrun
    dr = session.query(DagRun).filter(DagRun.dag_id == self.dag_id, DagRun.run_id == self.run_id).one()
  File "/home/../.env/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3500, in one
    raise orm_exc.NoResultFound("No row was found for one()")
sqlalchemy.orm.exc.NoResultFound: No row was found for one()

Any suggestions?



Solution 1:[1]

I assume you are using it in some kind of unit test. What you are missing (and as the error indicates) is a DagRun:

from airflow.models import DagRun
DagRun(dag_id=self.dag.dag_id, execution_date=timezone.utcnow(), run_id="test")
ti.dag_run = dag_run

This is needed because tasks are associated to a DagRun not to a DAG. DAG can have many DagRuns.

You can see example in one of the unit tests in Airflow codebase.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Elad Kalif