'Airflow - call a operator inside a function

I'm trying to call a python operator which is inside a function using another python operator. Seems something I missed, can someone help me to find out what I missed.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago


dd = datetime(2018, 1, 1)
args = {
    'owner': 'airflow',
    'start_date': dd,
    'retries': 0

}

def postgres_to_gcs():
    t1 = BashOperator(
    task_id='count_lines',
    bash_command='echo "task1"',
    xcom_push=True,
    dag=dag)
    return t1



with DAG('python_dag', description='Python DAG', schedule_interval='*/15 * * * *', start_date=dd, catchup=False) as dag:
    python_task = PythonOperator(task_id='python_task', python_callable=postgres_to_gcs)
 
    python_task

Error:

[2020-10-10 09:34:10,700] {baseoperator.py:351} WARNING - start_date for <Task(BashOperator): ttest-task> isn't datetime.datetime
[2020-10-10 09:34:10,700] {taskinstance.py:1150} ERROR - '>' not supported between instances of 'Pendulum' and 'str'
Traceback (most recent call last):
  File "/root/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/root/airflow/dags/estdag.py", line 19, in postgres_to_gcs
    dag=dag)
  File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 101, in __init__
    super(BashOperator, self).__init__(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/utils/decorators.py", line 98, in wrapper
    result = func(*args, **kwargs)
  File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 423, in __init__
    self.dag = dag
  File "/root/.local/lib/python3.7/site-packages/airflow/models/baseoperator.py", line 549, in dag
    dag.add_task(self)
  File "/root/.local/lib/python3.7/site-packages/airflow/models/dag.py", line 1325, in add_task
    task.start_date = max(task.start_date, self.start_date)
TypeError: '>' not supported between instances of 'Pendulum' and 'str'
[2020-10-10 09:34:10,702] {taskinstance.py:1194} INFO - Marking task as FAILED. dag_id=python_dag, task_id=python_task, execution_date=20201010T093407, start_date=20201010T093410, end_date=20201010T093410

One workaround suggested by Racooneer (but still the issue is there)

Thanks, Racooneer!!!

Removing default_args helped to solve it, but not able to see bash command output



Solution 1:[1]

I'm not exactly sure what you are trying to do but the code you posted in the python function doesn't really execute the operator.

This should work just fine:

def postgres_to_gcs():
    t1 = BashOperator(
        task_id='count_lines',
        bash_command='echo task1', 
        xcom_push=True            #Note: there is no dag=dag here!
    )
    t1.execute(dict())

with DAG(
        'python_dag',
        description='Python DAG',
        schedule_interval='*/15 * * * *',
        start_date=datetime(2018, 1, 1),
        catchup=False
) as dag:
    python_task = PythonOperator(
        task_id='python_task',
        python_callable=postgres_to_gcs
    )

Note that operators are python classes. When you call operators inside python function remember that you just initialize the class constructor. To run the operator you will need to call its execute method.

Note: Using operator inside operator is not a good practice. You should use hooks or create custom operators. You can read more about why in the following answer.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1