'Pyspark keeps hanging on dataset while Pandas works very well
I am following this tutorial: https://towardsdatascience.com/how-to-develop-a-credit-risk-model-and-scorecard-91335fc01f03
I implemented it successfully in Google Colab. Now, Im trying to convert it into Pyspark. However, Pyspark keeps hanging and keeps causing out of memory issues. This is my code:
Downloading data from drive:
%cd "/content/drive/MyDrive/"
!gdown --id "1xaF743cmUgI5kc76I86AeZDE84SMkMnt"
Initializing spark session:
spark = SparkSession \
.builder \
.appName("program2") \
.master("local[*]") \
.config("spark.memory.offHeap.enabled",True)\
.config("spark.memory.offHeap.size","16g").getOrCreate()
conf = SparkConf()
conf.set('spark.executor.memory', '10G') \
.set('spark.driver.memory', '10G')\
.set('spark.driver.maxResultSize', '10G') \
.set('spark.kryoserializer.buffer.max', '128m')\
.set('spark.kryoserializer.buffer.max.mb', '128m')
sc = SparkContext.getOrCreate(conf=conf)
sc
# config = SparkConf().setAll([('setMaster', '1') ])
# sc.stop()
# sc = SparkContext(conf=config)
df = spark.read.load("loan_data_2007_2014.csv",format="csv", sep=",", inferSchema="true", header="true")
df.rdd.getNumPartitions()
This drops columns that have more than 80% null values
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
# to_drop = [k for k, v in null_counts.items() if v > 0]
# df = df.drop(*to_drop)
count = df.count()
to_drop = []
for k,v in null_counts.items():
if v > 0.8 * count:
to_drop.append(k)
df = df.drop(*to_drop)
This creates label col
df = df.withColumn('good_bad',when(df.loan_status.isin(['Charged Off', 'Default', 'Late (31-120 days)','Does not meet the credit policy. Status:Charged Off']),1).otherwise('0'))
This separates categorical and numerical columns
# label columns
colLabel = "good_bad"
# categorical columns
colNum = ["member_id","loan_amnt","funded_amnt","funded_amnt_inv","int_rate","installment","annual_inc","dti","delinq_2yrs","inq_last_6mths","mths_since_last_delinq","open_acc","pub_rec","revol_bal","revol_util","total_acc","out_prncp","out_prncp_inv","total_pymnt","total_pymnt_inv","total_rec_prncp","total_rec_int","total_rec_late_fee","recoveries","collection_recovery_fee","last_pymnt_amnt","collections_12_mths_ex_med","mths_since_last_major_derog","policy_code","acc_now_delinq","tot_coll_amt","tot_cur_bal","total_rev_hi_lim"]
#numerical columns
colCat = list(__builtin__.filter(lambda x: x != colLabel and x not in colNum , df.columns))
colCat
This imputes missing values
cols = colNum
for col in cols:
df = df.withColumn(
col,
F.col(col).cast(DoubleType())
)
from pyspark.ml.feature import Imputer
imputer = Imputer().setInputCols(colNum).setOutputCols(colNum).setStrategy("median")
model = imputer.fit(df)
imputeddf = model.transform(df)
This normalizes numerical values
assembler = VectorAssembler(inputCols = cols, outputCol="features")
adf = assembler.transform(imputeddf)
scaler = StandardScaler(inputCol=("features"), outputCol=("scaledFeatures"),withStd=True, withMean=True)
scalerModel = scaler.fit(adf)
# Normalize each feature to have unit standard deviation.
sdf = scalerModel.transform(adf)
This is to Label encode categorial columns
from pyspark.ml.feature import StringIndexer
# from pyspark.sql.functions import array_contains, col, explode
from pyspark.sql.window import Window
# colCat.append("id")
l1 = ["id", "issue_d","url","desc","earliest_cr_line","last_pymnt_d","next_pymnt_d"]
colCat = [x for x in colCat if x not in l1]
for c in colCat:
print(c)
ndf = sdf.withColumn(c+"_num", F.dense_rank().over(Window.orderBy(sdf[c])))
# indexer = StringIndexer(inputCols = colCat, outputCols=[k+"_num" for k in colCat])
# df = indexer.fit(df).transform(df)
# df.select([colCat[0], colCat[0]+"num"]).show(5)
# indexer.fit(df).labels
# idxHousing.select(["ocean_proximity", "ocean_proximity_num"]).show(5)
colCat
After this my code stops working. It doesn't even execute df.show(). String Indexer has memory leak issue but writing my own code doesn't work either. 4M isnt a big enough dataset. It is working fine in Pandas then why is it creating an issue in Spark. Although the purpose of spark is to handle large amounts of data
Solution 1:[1]
Spark doesn’t work the way Pandas does. so in a nutshell, All you computations are running in a single go(read lazy evaluation in spark). few suggestion I have for you:
- avoid collect statement at any point of your code. like at null columns identification, you can implement it without collecting. search out for a way I am sure it will be easy.
- Since you are applying transformation using ML Lib would recommend you to create interim tables after significant interval. Ex. you can create one interim table after cleaning the data, one after normalisation or likewise.
- repartitioning/coalesce: You need to understand you data partition and how it is being shuffled. just for a case like this, I would recommend you to repartition while writing your interim table. Repartition number you can bring like this : Partition = executor*cores
This will significantly eliminate your problem and will make this pipeline more scalable and robust.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | jigyasu nayyar |
