'Spark binary file and Delta Table
I have batches of binary files (~3mb each) that I receive in batches of ~20000 files at a time. These files are used downstream for further processing, but I want to process them and store in Delta tables.
I can do this easily:
df = spark.read.format(“binaryFile”).load(<path-to-batch>)
df = df.withColumn(“id”, expr(“uuid()”)
dt = DeltaTable.forName(“myTable”)
dt.alias(“a”).merge(
    df.alias(“a”),
    “a.path = b.path”
).whenNotMatchedInsert(
    values={“id”: “b.id”, “content”: “b.content”}
).execute()
This makes the table quite slow already, but later I need to query certain IDs, do collect and write them individually back to binary files.
Questions:
- Would my table benefit from a batch column and partition?
- Should I partition by id? I know this is not ideal, but might make querying individual rows easier?
- Is there a better way to write the files out again, rather than .collect()? I have seen when I select about 1000 specific ids write them out that about 10 minutes is just for collect and then less than a minute to write. I do something like:
for row in df.collect():
    with open(row.id, “wb”) as fw:
        fw.write(row.content)
Solution 1:[1]
- As uuid() returns random values, I'm afraid we cannot use it to compare existing data with new records. (Sorry if I misunderstood the idea)
- I don't think using partition by id will help as the id column has obviously high cardinality.
- Instead of using collect() which loads all records into Driver, I think it would be better if you can write the records in the Spark dataframe directly and simultaneously from all the worker nodes into a temporary location on ADLS first and then aggregate a few data files from that location.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source | 
|---|---|
| Solution 1 | PhuriChal | 
