'How to improve performance PySpark Pandas loop in Databricks
I have what I assume should be a very parallelisable problem, but I cannot seem to make it work. I am using Azure Databrick, with the 10.4 LTS ML runtime.
I have a dataset, which within contains sets of test results. It is around 4Gb in size and contains around 20,000 tests. Each test result contains around 5,000-10,000 data points, which makes the shape of several peaks (the number of peaks and their location is different for each test). For a single test, I want to remove the space between the peaks, and separate them out into different dataframes. I have some pseudocode attached here, which is applied to a single test's results:
def peak_finder(pandas_dataframe):
# code to find peaks
return list_of_Pandas_dataframes
In this function, I use pyspark.pandas.DataFrame.truncate, which I am unsure of how to replicate in pure pyspark. I return a list of Pyspark Pandas dataframes which each contain 1 peak from the test. Running this, on a dataframe, takes around 0.02 seconds
My problem is then applying this to the other 20,000 tests in the entire set. This is my current method:
#list_test_ids = list of all test ids
# all_tests = the full dataset
all_peaks = []
for single_test_id in list_test_ids:
single_test = all_tests.where(col("TestId") == single_test_id)
single_test = single_test.toPandas()
peaks = peak_finder(single_test)
all_peaks.extend(peaks)
This is incredibly slow, or causes an Out of Memory error (Ive already increased the size of the driver). I think the using .toPandas() is partly to blame, as this seems slow. Generally though, this seems like an incredibly parallelisable problem, one which is currently not parallel.
My questions:
On large sets like this, should i ever use PySpark Pandas? Is it good practise to always use the regular API?
I feel like using a loop here is a mistake. However, I do not know what to replace it with. Using a
mapor aforEachseems more appropriate. But, I can't see how I could make this work with mypeak_finderfunction or with PySpark Pandas.For problems like this, where I am trying to manipulate a 4Gb dataset, what worker/driver configuration do you recommend? Perhaps my current choice is not suitable?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
