'Receiving EPIPE error when streaming from PSQL copy function

I am trying to write a streaming implementation of dumping a table from psql into a pre-signed URL on S3. Unfortunately, it seems to error out at a seemingly random time in the upload. I have tried many combinations of opening/closing the file descriptors at different times. I for the life of me cannot figure out why this is occurring.

The strangest thing is when I mock the requests library and analyze the sent data, it works as intended. The socket is raising an EPIPE error at a certain amount through the stream

from psycopg2 import connect
import threading
import requests
import requests_mock
import traceback
from base64 import b64decode
from boto3 import session

r_fd, w_fd = os.pipe()

connection = connect(host='host', database='db',
                     user='user', password='pw')
cursor = connection.cursor()

b3_session = session.Session(profile_name='profile', region_name='us-east-1')
url = b3_session.client('s3').generate_presigned_url(
    ClientMethod='put_object',
    Params={'Bucket': 'bucket', 'Key': 'test_streaming_upload.txt'},
    ExpiresIn=3600)

rd = os.fdopen(r_fd, 'rb')
wd = os.fdopen(w_fd, 'wb')


def stream_data():
    print('Starting stream')
    with os.fdopen(r_fd, 'rb') as rd:
        requests.put(url, data=rd, headers={'Content-type': 'application/octet-stream'})
    print('Ending stream')


to_thread = threading.Thread(target=stream_data)
to_thread.start()

print('Starting copy')
with os.fdopen(w_fd, 'wb') as wd:
    cursor.copy_expert('COPY table TO STDOUT WITH CSV HEADER', wd)
print('Ending copy')

to_thread.join()

The output is always the same:

Starting stream
Starting copy
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/venv/lib/python3.9/site-packages/urllib3/contrib/pyopenssl.py", line 342, in _send_until_done
    return self.connection.send(data)
  File "/venv/lib/python3.9/site-packages/OpenSSL/SSL.py", line 1718, in send
    self._raise_ssl_error(self._ssl, result)
  File "/venv/lib/python3.9/site-packages/OpenSSL/SSL.py", line 1624, in _raise_ssl_error
    raise SysCallError(errno, errorcode.get(errno))
OpenSSL.SSL.SysCallError: (32, 'EPIPE')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/venv/lib/python3.9/site-packages/requests/adapters.py", line 473, in send
    low_conn.send(b'\r\n')
  File "/Users/me/.pyenv/versions/3.9.7/lib/python3.9/http/client.py", line 995, in send
    self.sock.sendall(data)
  File "/venv/lib/python3.9/site-packages/urllib3/contrib/pyopenssl.py", line 354, in sendall
    sent = self._send_until_done(
  File "/venv/lib/python3.9/site-packages/urllib3/contrib/pyopenssl.py", line 349, in _send_until_done
    raise SocketError(str(e))
OSError: (32, 'EPIPE')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/me/.pyenv/versions/3.9.7/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/Users/me/.pyenv/versions/3.9.7/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/me/Library/Application Support/JetBrains/PyCharm2021.2/scratches/scratch_60.py", line 37, in stream_data
    requests.put(url, data=rd, headers={'Content-type': 'application/octet-stream'})
  File "/venv/lib/python3.9/site-packages/requests/api.py", line 131, in put
    return request('put', url, data=data, **kwargs)
  File "/venv/lib/python3.9/site-packages/requests/api.py", line 60, in request
    return session.request(method=method, url=url, **kwargs)
  File "/venv/lib/python3.9/site-packages/requests/sessions.py", line 533, in request
    resp = self.send(prep, **send_kwargs)
  File "/venv/lib/python3.9/site-packages/requests/sessions.py", line 646, in send
    r = adapter.send(request, **kwargs)
  File "/venv/lib/python3.9/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: (32, 'EPIPE')

Am I missing something obvious? Is this a memory error? I appreciate any insight I can get because this is killing me. I can verify that the socket is being written to anywhere from 1.5 to 2.5k times before this error occurs.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source