'Unable to connect to FTP server from GCP Composer using Airflow
I've been having some trouble connecting to an FTP server using google's cloud applications (both Airflow running on Composer and Colaboratory)
When running the code on a local Jupyter Notebook, the connections runs flawlessly
from ftplib import FTP
ftp = FTP('ftp.mtps.gov.br')
ftp.login()
But running the same lines on the cloud applications returns
[2021-06-24 01:09:53,750] {taskinstance.py:1457} ERROR - [Errno 110] Connection timed out
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1113, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1287, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1317, in _execute_task
result = task_copy.execute(context=context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 128, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/airflow/gcs/dags/RAIS_ETL.py", line 31, in downloadData
ftp = FTP('ftp.mtps.gov.br')
File "/opt/python3.8/lib/python3.8/ftplib.py", line 117, in __init__
self.connect(host)
File "/opt/python3.8/lib/python3.8/ftplib.py", line 152, in connect
self.sock = socket.create_connection((self.host, self.port), self.timeout,
File "/opt/python3.8/lib/python3.8/socket.py", line 808, in create_connection
raise err
File "/opt/python3.8/lib/python3.8/socket.py", line 796, in create_connection
sock.connect(sa)
TimeoutError: [Errno 110] Connection timed out
Solution 1:[1]
I used the sample code by using the same import function as yours, but changed the URL to ftp.us.debian.org to connect to the FTP server and tried to run the dag tasks.
Scenario 1:(not passing any argument in method):
funcftp.py
def my_function():
from ftplib import FTP
ftp = FTP('ftp.us.debian.org')
ftp.login()
dagftp.py
from __future__ import print_function
import datetime
from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
from ftplib import FTP
from funcftp import my_function
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date':YESTERDAY,
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
'composer_ftp',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# An instance of an operator is called a task. In this case, the
# hello_python task calls the "greeting" Python function.
hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=my_function)
# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='echo Goodbye.')
# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
hello_python >> goodbye_bash
Scenario2(by passing arguments in the method)
dag.py
from __future__ import print_function
import datetime
from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
from ftplib import FTP
from myfun1 import my_function
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_dag_args = {
# The start_date describes when a DAG is valid / can be run. Set this to a
# fixed point in time rather than dynamically, since it is evaluated every
# time a DAG is parsed. See:
# https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date
'start_date': YESTERDAY,
}
# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
'demo_run',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# An instance of an operator is called a task. In this case, the
# hello_python task calls the "greeting" Python function.
hello_python = python_operator.PythonOperator(
task_id='hello_world',
python_callable=my_function,
op_kwargs={"x" : "python"})
# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='echo Goodbye.')
# Define the order in which the tasks complete by using the >> and <<
# operators. In this example, hello_python executes before goodbye_bash.
func.py
def my_function(x):
from ftplib import FTP
ftp = FTP('ftp.us.debian.org')
ftp.login()
return x + " is a programming language"
In one of the error messages it indicates the usage of self keyword which refers to the object. If you are using all the methods as StaticMethods, there is no need to pass self to the method. Because static methods can be called without object creation, they do not have a self keyword.
If you are passing some arguments in the methods, you need to make sure the arguments are also passed to the DAG Tasks as well by providing op_args and_op_kwargs arguments.
If this does not work for you, please provide the code you are using.
Solution 2:[2]
This is about 6 months too late for you, probably but I had this exact issue today and came across this, leaving my solution here for others who might be having the same problem: In order to get the connection working you need to set up Cloud NAT together with Cloud Composer to give the workers public internet access, as described here: https://cloud.google.com/composer/docs/composer-2/private-ip-environments#public_internet_access_for_your_workflows
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Prajna Rai T |
| Solution 2 | Julmust |
