'Receiving EPIPE error when streaming from PSQL copy function
I am trying to write a streaming implementation of dumping a table from psql into a pre-signed URL on S3. Unfortunately, it seems to error out at a seemingly random time in the upload. I have tried many combinations of opening/closing the file descriptors at different times. I for the life of me cannot figure out why this is occurring.
The strangest thing is when I mock the requests library and analyze the sent data, it works as intended. The socket is raising an EPIPE error at a certain amount through the stream
from psycopg2 import connect
import threading
import requests
import requests_mock
import traceback
from base64 import b64decode
from boto3 import session
r_fd, w_fd = os.pipe()
connection = connect(host='host', database='db',
user='user', password='pw')
cursor = connection.cursor()
b3_session = session.Session(profile_name='profile', region_name='us-east-1')
url = b3_session.client('s3').generate_presigned_url(
ClientMethod='put_object',
Params={'Bucket': 'bucket', 'Key': 'test_streaming_upload.txt'},
ExpiresIn=3600)
rd = os.fdopen(r_fd, 'rb')
wd = os.fdopen(w_fd, 'wb')
def stream_data():
print('Starting stream')
with os.fdopen(r_fd, 'rb') as rd:
requests.put(url, data=rd, headers={'Content-type': 'application/octet-stream'})
print('Ending stream')
to_thread = threading.Thread(target=stream_data)
to_thread.start()
print('Starting copy')
with os.fdopen(w_fd, 'wb') as wd:
cursor.copy_expert('COPY table TO STDOUT WITH CSV HEADER', wd)
print('Ending copy')
to_thread.join()
The output is always the same:
Starting stream
Starting copy
Exception in thread Thread-1:
Traceback (most recent call last):
File "/venv/lib/python3.9/site-packages/urllib3/contrib/pyopenssl.py", line 342, in _send_until_done
return self.connection.send(data)
File "/venv/lib/python3.9/site-packages/OpenSSL/SSL.py", line 1718, in send
self._raise_ssl_error(self._ssl, result)
File "/venv/lib/python3.9/site-packages/OpenSSL/SSL.py", line 1624, in _raise_ssl_error
raise SysCallError(errno, errorcode.get(errno))
OpenSSL.SSL.SysCallError: (32, 'EPIPE')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/venv/lib/python3.9/site-packages/requests/adapters.py", line 473, in send
low_conn.send(b'\r\n')
File "/Users/me/.pyenv/versions/3.9.7/lib/python3.9/http/client.py", line 995, in send
self.sock.sendall(data)
File "/venv/lib/python3.9/site-packages/urllib3/contrib/pyopenssl.py", line 354, in sendall
sent = self._send_until_done(
File "/venv/lib/python3.9/site-packages/urllib3/contrib/pyopenssl.py", line 349, in _send_until_done
raise SocketError(str(e))
OSError: (32, 'EPIPE')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/me/.pyenv/versions/3.9.7/lib/python3.9/threading.py", line 973, in _bootstrap_inner
self.run()
File "/Users/me/.pyenv/versions/3.9.7/lib/python3.9/threading.py", line 910, in run
self._target(*self._args, **self._kwargs)
File "/Users/me/Library/Application Support/JetBrains/PyCharm2021.2/scratches/scratch_60.py", line 37, in stream_data
requests.put(url, data=rd, headers={'Content-type': 'application/octet-stream'})
File "/venv/lib/python3.9/site-packages/requests/api.py", line 131, in put
return request('put', url, data=data, **kwargs)
File "/venv/lib/python3.9/site-packages/requests/api.py", line 60, in request
return session.request(method=method, url=url, **kwargs)
File "/venv/lib/python3.9/site-packages/requests/sessions.py", line 533, in request
resp = self.send(prep, **send_kwargs)
File "/venv/lib/python3.9/site-packages/requests/sessions.py", line 646, in send
r = adapter.send(request, **kwargs)
File "/venv/lib/python3.9/site-packages/requests/adapters.py", line 498, in send
raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: (32, 'EPIPE')
Am I missing something obvious? Is this a memory error? I appreciate any insight I can get because this is killing me. I can verify that the socket is being written to anywhere from 1.5 to 2.5k times before this error occurs.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
