'Pyspark dataframe processing taller and wider datasets without OutOfMemory issue
We are trying to do some operations which involves all columns(For example finding top 5 values in each column)
from pyspark.sql.types import *
data = []
row_count = 1000
col_count = 50
header_structs = []
for i in range(row_count):
row_data = tuple()
req_header_name = len(header_structs) == 0
for j in range(col_count):
if req_header_name:
col_name = "col_{}".format(j)
header_structs.append(StructField(col_name, IntegerType()))
val = (i + 2) * (j + 2)
if (val) % 3 == 0:
val = 6
if val is not None and val % 2 == 0:
val = 10
row_data = row_data + (val,)
data.append(row_data)
schema = StructType(header_structs)
dataframe = spark.createDataFrame(data, schema)
row_count & col_count are given small values in example. In real data, row count can be minimum 10M and col_count can be 2000 columns.
from pyspark.sql import functions as F
import pandas as pd
from pyspark.sql.types import *
def find_freq_values(col_list):
df = pd.DataFrame(col_list, columns=["value"])
df = df[['value']].groupby(['value'])['value'] \
.count() \
.reset_index(name='count') \
.sort_values(['count'], ascending=False) \
.head(5)
res = df.to_dict(orient='records')
return res
find_freq_udf = F.udf(find_freq_values, ArrayType(MapType(StringType(), StringType(), True)))
freq_res_df = dataframe.select(
*[find_freq_udf(F.collect_list(c)).alias(c) for c in dataframe.columns]
)
freq_res = freq_res_df.collect()[0].asDict()
print(freq_res)
Initially we were processing with trivial loop approach and select each column inside it.
Sample of old approach
for current_col in dataframe.columns:
dataframe.select(current_col)
But the above approach took a toll when there are more columns(1000+ Columns).
So based on the suggestion (also this reference) & UDF aggregation,
for processing columns in parallel we used custom UDF aggregator function find_freq_udf on all columns.
This works well when number of rows is less. But when rows increase, we end of getting OutOfMemory error since pandas dataframe is used in the custom aggregate function.
Is it possible to process this huge amount of data without this out of memory error?
Can we have spark optimization inside that UDF aggregation function?
Azure Databricks cluster is used for this testing. 7.3 LTS (includes Apache Spark 3.0.1, Scala 2.12)
Standard F4s - 8GB Memory - 4 Cores(Min-Max 1-2 workers)
Similar to top 5 values in the example, we are also doing some additional processing using pandas inside aggregate functions.
Update 1: One approach that can be done - When there are more columns, doing operations for 'N' number of columns at a time, then performing next set of columns.
Example) process first 500 cols first, then process next set of columns.
Finally we can merge results for 1000 columns.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
