'Py-Spark mapPartitions: how to craft the function?

We are using Databricks on Azure with a reasonably large cluster (20 cores, 70GB memory across 5 executors). I have a parquet file with 4 million rows. Spark can read well, call that sdf.

I am hitting the problem that the data must be converted to a Pandas dataframe. Taking the easy/obvious way pdf = sdf.toPandas() causes an out of memory error.

So I want to apply my function separately to subsets of the Spark DataFrame. The sdf itself is in 19 partitions, so what I want to do is write a function and apply it to each partition separately. Here's where mapPartitions comes in. I was trying to write my own function like

def example_function(sdf):
    pdf = sdf.toPandas()
    /* apply some Pandas and Python functions we've written to handle pdf.*/
    output = great_function(pdf)
    return output

Then I'd use mapPartitions to run that.

sdf.rdd.mapPartitions(example_function)

That fails with all kinds of errors.

Looking back at the instructions, I realize I'm clueless! Iwas too optimistic/simplistic in what they expect to get from me. They don't seem to imagine that I'm using my own functions to handle the whole Spark DF that exists partition. They seem to plan only for code that would handle the rows in the Spark data frame one row at a time and the parameters are Iterators.

Can you please share you thoughts on this?



Solution 1:[1]

In your example case it might be counter productive to start from a Spark Dataframe and fall back to RDD if you're aiming at using pandas. Under the hood toPandas() is triggering collect() which retrieve all data on the driver node, which will fail on large data.

If you want to use pandas code on Spark, you can use pandas UDFs which are equivalent to UDFs but designed and optimized for pandas code.

https://docs.databricks.com/spark/latest/spark-sql/udf-python-pandas.html

Solution 2:[2]

I did not find a solution using Spark map or similar. Here is best option I've found.

  1. The parquet folder has lots of smaller parquet files inside it. As long as default settings were used, these files have extension snappy.parquet. Use Python os.listdir and filter out the file list to ones with correct extension.

  2. Use Python and Pandas, NOT SPARK, tools to read the individual parquet files. It is much faster to load a parquet file with a few 100,000 rows with pandas than it is with Spark.

  3. For the loaded dataframes, run the function I described in the first message, where the dataframe gets put through the wringer.

def example_function(pdf):
    /* apply some Pandas and Python functions we've written to handle pdf.*/
    output = great_function(pdf)
    return output

Since the work for each data section has to happen in Pandas anyway, there's no need to keep fighting with Spark tools.

Other bit worth mentioning is that joblib's Parallel tool can be used to distribute this work among cluster nodes.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Gaarv
Solution 2 pauljohn32