'Read / Write Parquet files without reading into memory (using Python)
I looked at the standard documentation that I would expect to capture my need (Apache Arrow and Pandas), and I could not seem to figure it out.
I know Python best, so I would like to use Python, but it is not a strict requirement.
Problem
I need to move Parquet files from one location (a URL) to another (an Azure storage account, in this case using the Azure machine learning platform, but this is irrelevant to my problem).
These files are too large to simply perform pd.read_parquet("https://my-file-location.parquet"), since this reads the whole thing into an object.
Expectation
I thought that there must be a simple way to create a file object and stream that object line by line -- or maybe column chunk by column chunk. Something like
import pyarrow.parquet as pq
with pq.open("https://my-file-location.parquet") as read_file_handle:
with pq.open("https://my-azure-storage-account/my-file.parquet", "write") as write_filehandle:
for next_line in read_file_handle{
write_file_handle.append(next_line)
I understand it will be a little different because Parquet is primarily meant to be accessed in a columnar fashion. Maybe there is some sort of config object that I would pass which specifies which columns of interest, or maybe how many lines can be grabbed in a chunk or something similar.
But the key expectation is that there is a means to access a parquet file without loading it all into memory. How can I do this?
FWIW, I did try to just use Python's standard open function, but I was not sure how to use open with a URL location and a byte stream. If it is possible to do this via just open and skip anything Parquet-specific, that is also fine.
Update
Some of the comments have suggested using bash-like scripts, such as here. I can use this if there is nothing else, but it is not ideal because:
- I would rather keep this all in a full language SDK, whether Python, Go, or whatever. If the solution moves into a bash script with pipes, it requires an external call since the final solution will not be written entirely bash, Powershell, or any scripting language.
- I really want to leverage some of the benefits of Parquet itself. As I mentioned in the comment below, Parquet is columnar storage. So if I have a "data frame" that is 1.1 billion rows and 100 columns, but I only care about 3 columns, I would love to be able to only download those 3 columns, saving a bunch of time and some money, too.
Solution 1:[1]
Great post, based on @Micah's answer, I put my 2 cents in it, in case you don't want to read the docs. A small snippet is the following:
import pandas as pd
import numpy as np
from pyarrow.parquet import ParquetFile
# create a random df then save to parquet
df = pd.DataFrame({
'A': np.arange(10000),
'B': np.arange(10000),
'C': np.arange(10000),
'D': np.arange(10000),
})
df.to_parquet('./test/test')
# ****** below is @Micah Kornfield's answer ******
# 1. open parquet file
batch = ParquetFile('./test/test')
# 2. define generator of batches
record = batch.iter_batches(
batch_size=10,
columns=['B', 'C'],
)
# 3. yield pandas/numpy data
print(next(record).to_pandas()) # pandas
print(next(record).to_pydict()) # native python dict
Solution 2:[2]
Please note that I did not specify the implementation of how to use batches on the distant server side.
My solution would be: writing batches into a buffer with pyarrow.NativeFile then reads the buffer with pyarrow.ipc.RecordBatchFileReader
I created this 2 classes to help you with the streaming process
import asyncio
from pyarrow.parquet import ParquetFile
class ParquetFileStreamer:
"""
Attributes:
ip_address: ip address of the distant server
port: listening port of the distant server
n_bytes: -1 means read whole batch
file_source: pathlib.Path, pyarrow.NativeFile, or file-like object
batch_size: default = 65536
columns: list of the columns you wish to select (if None selects all)
Example:
>>> pfs = ParquetFileStreamer
>>> class MyStreamer(ParquetFileStreamer)
file_source = '/usr/fromage/camembert.parquet
columns = ['name', 'price']
>>> MyStreamer.start_stream()
"""
ip_address = '192.168.1.1'
port = 80
n_bytes = -1
file_source: str
batch_size = 65536
columns = []
@classmethod
def start_stream(cls):
for batch in cls._open_parquet():
asyncio.run(cls._stream_parquet(batch))
@classmethod
def _open_parquet(cls):
return ParquetFile(cls.file_source).iter_batches(
batch_size=cls.batch_size,
columns=cls.columns
)
@classmethod
async def _stream_parquet(cls, batch):
reader, writer = await asyncio.open_connection(cls.ip_address, cls.port)
writer.write(batch)
await writer.drain()
await reader.read()
writer.close()
await writer.wait_closed()
class ParquetFileReceiver:
"""
Attributes: \n
port: specify the port \n
n_bytes: -1 reads all the batch
Example:
>>> pfr = ParquetFileReceiver
>>> asyncio.run(pfr.server())
"""
port = 80
n_bytes = -1
@classmethod
async def handle_stream(cls, reader, writer):
data = await reader.read(cls.n_bytes)
batch = data.decode()
print(batch)
@classmethod
async def server(cls):
server = await asyncio.start_server(cls.handle_stream, port=cls.port)
async with server:
await server.serve_forever()
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Zézouille |
| Solution 2 | Alexandre Mahdhaoui |
