'pyarrow write_dataset limit per partition files
Let's say I've got some data which is in an existing dataset which has no column partitioning but is still 200+ files. I want to rewrite that data into a hive partition. The dataset is too big to open its entirety into memory. In this case the dataset came from a CETAs in Azure Synapse but I don't think that matters.
I do:
dataset=ds.dataset(datadir, format="parquet", filesystem=abfs)
new_part = ds.partitioning(pa.schema([("nodename", pa.string())]), flavor="hive")
scanner=dataset.scanner()
ds.write_dataset(scanner, newdatarootdir,
format="parquet", partitioning=new_part,
existing_data_behavior="overwrite_or_ignore",
max_partitions=2467)
In this case there are 2467 unique nodenames and so I want there to be 2467 directories each with 1 file. However what I get is 2467 directories each with 100+ files of about 10KB each. How do I get just 1 file per partition?
I could do a second step
for node in nodes:
fullpath=f"{datadir}\\{node}"
curnodeds=ds.dataset(fullpath, format="parquet")
curtable=curnodeds.to_table()
os.makedirs(f"{newbase}\\{node}")
pq.write_table(curtable, f"{newbase}\\{node}\\part-0.parquet",
version="2.6", flavor="spark", data_page_version="2.0")
Is there a way to incorporate the second step into the first step?
Edit:
This is with pyarrow 7.0.0
Edit(2):
Using max_open_files=3000 did result in one file per partition. The metadata comparison between the two are that the two step approach has (for one partition)...
<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A40>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 1
format_version: 2.6
serialized_size: 1673
size on disk: 278kb
and the one step...
<pyarrow._parquet.FileMetaData object at 0x000001C7FA414A90>
created_by: parquet-cpp-arrow version 7.0.0
num_columns: 8
num_rows: 24840
num_row_groups: 188
format_version: 1.0
serialized_size: 148313
size on disk: 1.04MB
Obviously in the two step version I'm explicitly setting the version to 2.6 so that explains that difference. The total size of my data after the 2-step is 655MB vs the 1 step is 2.6GB. The time difference was pretty significant too. The two step was something like 20 minutes for the first step and 40 minutes for the second. The one step was like 5 minutes for the whole thing.
The remaining question is, how to set version="2.6" and data_page_version="2.0" in write_dataset? I'll still wonder why the row_groups is so different if they're so different when setting those parameters but I'll defer that question.
Solution 1:[1]
The dataset writer was changed significantly in 7.0.0. Previously, it would always create 1 file per partition. Now there are a couple of settings that could cause it to write multiple files. Furthermore, it looks like you are ending up with a lot of small row groups which is not ideal and probably the reason the one-step process is both slower and larger.
The first significant setting is max_open_files. Some systems limit how many file descriptors can be open at one time. Linux defaults to 1024 and so pyarrow attempts defaults to ~900 (with the assumption that some file descriptors will be open for scanning, etc.) When this limit is exceeded pyarrow will close the least recently used file. For some datasets this works well. However, if each batch has data for each file this doesn't work well at all. In that case you probably want to increase max_open_files to be greater than your number of partitions (with some wiggle room because you will have some files open for reading too). You may need to adjust OS-specific settings to allow this (generally, these OS limits are pretty conservative and raising this limit is fairly harmless).
I'll still wonder why the row_groups is so different if they're so different when setting those parameters but I'll defer that question.
In addition, the 7.0.0 release adds min_rows_per_group, max_rows_per_group and max_rows_per_file parameters to the write_dataset call. Setting min_rows_per_group to something like 1 million will cause the writer to buffer rows in memory until it has enough to write. This will allow you to create files with 1 row group instead of 188 row groups. This should bring down your file size and fix your performance issues.
However, there is a memory cost associated with this which is going to be min_rows_per_group * num_files * size_of_row_in_bytes.
The remaining question is, how to set version="2.6" and data_page_version="2.0" in write_dataset?
The write_dataset call works on several different formats (e.g. csv, ipc, orc) and so the call only has generic options that apply regardless of format.
Format-specific settings can instead be set using the file_options parameter:
# It does not appear to be documented but make_write_options
# should accept most of the kwargs that write_table does
file_options = ds.ParquetFileFormat().make_write_options(version='2.6', data_page_version='2.0')
ds.write_dataset(..., file_options=file_options)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
