'How to improve performance PySpark Pandas loop in Databricks

I have what I assume should be a very parallelisable problem, but I cannot seem to make it work. I am using Azure Databrick, with the 10.4 LTS ML runtime.

I have a dataset, which within contains sets of test results. It is around 4Gb in size and contains around 20,000 tests. Each test result contains around 5,000-10,000 data points, which makes the shape of several peaks (the number of peaks and their location is different for each test). For a single test, I want to remove the space between the peaks, and separate them out into different dataframes. I have some pseudocode attached here, which is applied to a single test's results:

def peak_finder(pandas_dataframe):
    # code to find peaks
    return list_of_Pandas_dataframes

In this function, I use pyspark.pandas.DataFrame.truncate, which I am unsure of how to replicate in pure pyspark. I return a list of Pyspark Pandas dataframes which each contain 1 peak from the test. Running this, on a dataframe, takes around 0.02 seconds

My problem is then applying this to the other 20,000 tests in the entire set. This is my current method:

#list_test_ids = list of all test ids
# all_tests = the full dataset
all_peaks = []
for single_test_id in list_test_ids:
    single_test = all_tests.where(col("TestId") == single_test_id)
    
    single_test = single_test.toPandas()
    
    peaks = peak_finder(single_test)
    
    all_peaks.extend(peaks)

This is incredibly slow, or causes an Out of Memory error (Ive already increased the size of the driver). I think the using .toPandas() is partly to blame, as this seems slow. Generally though, this seems like an incredibly parallelisable problem, one which is currently not parallel.

My questions:

  • On large sets like this, should i ever use PySpark Pandas? Is it good practise to always use the regular API?

  • I feel like using a loop here is a mistake. However, I do not know what to replace it with. Using a map or a forEach seems more appropriate. But, I can't see how I could make this work with my peak_finder function or with PySpark Pandas.

  • For problems like this, where I am trying to manipulate a 4Gb dataset, what worker/driver configuration do you recommend? Perhaps my current choice is not suitable?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source