'File transfer from FTPS to SFTP using python
I am doing some performance test to transfer large files (~ 4 GB) from FTPS to SFTP server. I did some research and tried python script to see if there is any performance improvement to get a file from FTPS and transfer to SFTP.
FTPS connection setup
def create_connection(self):
print('Creating session..........')
ftp = ftplib.FTP_TLS()
# ftp.set_debuglevel(2)
ftp.connect(self.host, self.port)
ftp.login(self.user, self.passwd)
ftp.prot_p()
# optimize socket params for download task
print('Optimizing socket..........')
ftp.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
ftp.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
ftp.sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
print('Session created successfully')
return ftp
def get_file(self, ftp_session, dst_filename, local_filename):
print('Starting download........', datetime.now())
myfile = BytesIO()
print(myfile.tell())
ftp_session.retrbinary('RETR %s' % dst_filename, myfile.write)
print(myfile.tell())
print('Download completed ........', datetime.now())
For SFTP connection I am using paramiko
host, port = "abc.com", 22
transport = paramiko.Transport((host, port))
username, password = "user", "pwd"
transport.connect(None, username, password)
transport.default_window_size = 3 * 1024 * 1024
sftp = paramiko.SFTPClient.from_transport(transport)
myfile.seek(0)
sftp.putfo(fl=myfile, remotepath='remotepath/' + local_filename)
sftp.close()
I am using BytesIO so that I can keep the file in memory and stream it while copying. The following code can copy the file but it is taking ~ 20 mins. The code is first copy the file in memory and then its transferring. Is there any possible way to transfer file more efficiently ?
Solution 1:[1]
After some short google searches (yes, google does really work=) I have stumbled across this thread:
Paramiko Fails to download large files >1GB
import threading, os, time, paramiko
#you could make the number of threads relative to file size
NUM_THREADS = 4
MAX_RETRIES = 10
def make_filepart_path(file_path, part_number):
"""creates filepart path from filepath"""
return "%s.filepart.%s" % (file_path, part_number+1)
def write_chunks(chunks, tnum, local_file_part, username, password, ftp_server, max_retries):
ssh_conn = sftp_client = None
for retry in range(max_retries):
try:
ssh_conn = paramiko.Transport((ftp_server, port))
ssh_conn.connect(username=username, password=password)
sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
with sftp_client.open(sftp_file, "rb") as infile:
with open(local_file_part, "wb") as outfile:
for chunk in infile.readv(chunks):
outfile.write(chunk)
break
except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
retry += 1
print("%s %s Thread %s - > retrying %s..." % (type(x), x, tnum, retry))
time.sleep(abs(retry) * 10)
finally:
if hasattr(sftp_client, "close") and callable(sftp_client.close):
sftp_client.close()
if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
ssh_conn.close()
start_time = time.time()
for retry in range(MAX_RETRIES):
try:
ssh_conn = paramiko.Transport((ftp_server, port))
ssh_conn.connect(username=username, password=password)
sftp_client = paramiko.SFTPClient.from_transport(ssh_conn)
# connect to get the file's size in order to calculate chunks
filesize = sftp_client.stat(sftp_file).st_size
sftp_client.close()
ssh_conn.close()
chunksize = pow(4, 12)
chunks = [(offset, chunksize) for offset in range(0, filesize, chunksize)]
thread_chunk_size = (len(chunks) // NUM_THREADS) + 1
# break the chunks into sub lists to hand off to threads
thread_chunks = [chunks[i:i+thread_chunk_size] for i in range(0, len(chunks) - 1, thread_chunk_size)]
threads = []
fileparts = []
for thread_num in range(len(thread_chunks)):
local_file_part = make_filepart_path(local_file, thread_num)
args = (thread_chunks[thread_num], thread_num, local_file_part, username, password, ftp_server, MAX_RETRIES)
threads.append(threading.Thread(target=write_chunks, args=args))
fileparts.append(local_file_part)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# join file parts into one file, remove fileparts
with open(local_file, "wb") as outfile:
for filepart in fileparts:
with open(filepart, "rb") as infile:
outfile.write(infile.read())
os.remove(filepart)
break
except (EOFError, paramiko.ssh_exception.SSHException, OSError) as x:
retry += 1
print("%s %s - > retrying %s..." % (type(x), x, retry))
time.sleep(abs(retry) * 10)
finally:
if hasattr(sftp_client, "close") and callable(sftp_client.close):
sftp_client.close()
if hasattr(ssh_conn, "close") and callable(ssh_conn.close):
ssh_conn.close()
print("Loading File %s Took %d seconds " % (sftp_file, time.time() - start_time))
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Dharman |
