'Use kwargs and context simultaniauly on Airflow
Im using Airflow 1.10.11.
Can I use a TriggerDagRunOperator to pass a parameter to the triggered dag? Airflow from a previous question I know that I can send parameter using a TriggerDagRunOperator.
But my new question is: Can I use the parameter from the dag_run on a def when using **kwargs? So I can retrieve the xcom values and the dag_run values?
I tried def new_op_fun(**kwargs, **context): , but that is an invalid syntax
please help, Thanks in advance !.
dag.py
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='my_dag',
schedule_interval='@once',
start_date=datetime(2021, 1, 1)
)
previous_op= BashOperator(
task_id="previous_op",
bash_command='echo "{{ params.week }}"',
params = {'week' : '$(date +%V -d \"1 week ago\")',},
provide_context=True,
xcom_push=True,
dag=dag
)
def run_this_func(**context):
ti = kwargs['ti']
xcom_value = ti.xcom_pull(task_ids='previous_op')
print( xcom_value )
print(context["dag_run"].conf)
def new_op_fun(**kwargs):
ti = kwargs['ti']
xcom_value = ti.xcom_pull(task_ids='previous_op')
print( xcom_value )
print(context["dag_run"].conf)
return( "hello" )
new_op = PythonOperator(
task_id='new_op',
provide_context=True,
python_callable=new_op_fun,
xcom_push=True,
dag=dag)
previous_op >> new_op
trigger.py
from datetime import datetime
from airflow.models import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
dag = DAG(
dag_id='trigger',
schedule_interval='@once',
start_date=datetime(2021, 1, 1)
)
def modify_dro(context, dagrun_order):
print(context)
print(dagrun_order)
dagrun_order.payload = {
"message": "This is my conf message"
}
return dagrun_order
run_this = TriggerDagRunOperator(
task_id='run_this',
trigger_dag_id='my_dag',
python_callable=modify_dro,
dag=dag
)
run_this
Solution 1:[1]
This works for me. Don't use kwargs, pass them in as arguments.
def func_my_task(my_param, **context):
...
t1 = PythonOperator(
task_id='my_task',
python_callable=func_my_task,
op_kwargs={'my_param': 'hello'},
dag=dag,
)
Solution 2:[2]
Your limitation is basically with being unable to use 2 dynamic dict as input parameter, if you turn one of these into a String or List, your problem will go away.
1-You can define 2 functions that in one of them you return the value of the **kwargs you like as a list or string and then pass it to the function that wants **context as first parameter(either as str1 or list1).
2- use a variable in a global way and fill it each time with contents returned by your first and second function and then use the results of these in your final function.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | BuffaloDev |
| Solution 2 | Aramis NSR |
