'How to pass multiple arguments separately to the next task in Airflow?
I am using a templated python file that I import on DAG. I want to pass multiple outputs(lists) generated by the 1st task to the next task. I tried this:
def funct1():
a = ['apple', 'orange']
b = ['mango', 'lemon']
return a, b
def funct2(a,b):
for k in a,
quote1 = (f"i love {k}!")
print(quote)
for x in b,
quote2 = (f"i love {k}!")
print(quote2)
dag = DAG("fruits", default_args=default_args)
t1 = PythonOperator(
task_id='descr',
python_callable=filenameoftemplate.funct1,
dag=dag)
t2 = PythonOperator(
task_id='display',
python_callable=filenameoftemplate.funct2,
op_kwargs=
{'a': t1.output,
'b': t2.output},
provide_context=True,
dag=dag)
result:
i love [apple, orange, mango, lemon]!
i love [apple, orange, mango, lemon]!
i love [apple, orange, mango, lemon]!
i love [apple, orange, mango, lemon]!
Thing is, all the returned values are being read as one on the next tasks. How can I pass and pull them as separate arguments? I want the result to be:
i love apple! i love orange! i love mango! i love lemon!
Solution 1:[1]
try to use a counter variable to determine access which value
def funct1():
a = ['apple', 'orange']
b = ['mango', 'lemon']
return a, b
# Use counter to determine access which value
counter = 0
def funct2(a,b):
quote1 = (f"i love {a[counter]}!")
print(quote1)
quote2 = (f"i love {b[counter]}!")
print(quote2)
counter += 1
dag = DAG("fruits", default_args=default_args)
t1 = PythonOperator(
task_id='descr',
python_callable=filenameoftemplate.funct1,
dag=dag)
t2 = PythonOperator(
task_id='display',
python_callable=filenameoftemplate.funct2,
op_kwargs=
{'a': t1.output,
'b': t2.output},
provide_context=True,
dag=dag)def funct1():
a = ['apple', 'orange']
b = ['mango', 'lemon']
return a, b
# Use counter to determine access which value
counter = 0
def funct2(a,b):
quote1 = (f"i love {a[counter]}!")
print(quote1)
quote2 = (f"i love {b[counter]}!")
print(quote2)
counter += 1
dag = DAG("fruits", default_args=default_args)
t1 = PythonOperator(
task_id='descr',
python_callable=filenameoftemplate.funct1,
dag=dag)
t2 = PythonOperator(
task_id='display',
python_callable=filenameoftemplate.funct2,
op_kwargs=
{'a': t1.output,
'b': t2.output},
provide_context=True,
dag=dag)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Jasmine |
