'Tuning `CrossValidator` spark job performance

I am running a 3-fold cross validation of an ML pipeline that utilizes GBTClassifier as the final step. It takes 18 hours to run and I am looking for feedback into how to improve the performance as I expect this to go faster.

For context here is the cluster configuration:

{
"autoscale": {
    "min_workers": 1,
    "max_workers": 4
},
"cluster_name": "model_training",
"spark_version": "10.3.x-cpu-ml-scala2.12",
"spark_conf": {
    "spark.conf.set(\"spark.databricks.io.cache.enabled\",": "\"true\")",
    "spark.databricks.delta.preview.enabled": "true"
},
"azure_attributes": {
    "first_on_demand": 1,
    "availability": "ON_DEMAND_AZURE",
    "spot_bid_max_price": -1
},
"node_type_id": "Standard_E8_v3",
"driver_node_type_id": "Standard_E8_v3",
"ssh_public_keys": [],
"custom_tags": {},
"spark_env_vars": {},
"autotermination_minutes": 60,
"enable_elastic_disk": true,
"cluster_source": "UI",
"init_scripts": [],
"cluster_id": "0224-222219-94q7zutd"

}

The below defines the pipeline:

# Define feature assembled
assembler = VectorAssembler(inputCols=feature_layers, outputCol="assembled", handleInvalid='keep')
 
# Define polynomial expansion on features
px = PolynomialExpansion(degree=3, inputCol="assembled", outputCol="expanded")
 
# Define standard scaler
standardScaler = StandardScaler(withMean=True, withStd=True, inputCol="expanded", outputCol="features")
 
# Define gradient boosted tree classifier
gbt = GBTClassifier(stepSize=0.3, maxIter=50)
# Define pipeline
pipeline = Pipeline(stages=[assembler, px, standardScaler, gbt])

The parameter grid contains 3 parameter values for maxDepth. Cross Validator uses 3 folds and a parallelism of 3.

paramGrid = (ParamGridBuilder().
    addGrid(gbt.maxDepth, [7, 10, 15]).
   build())
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(metricName='areaUnderPR'),
                          numFolds=3, collectSubModels=False, parallelism=3) 
 
gbt_pipe = crossval.fit(train)

Train is obtained from a dataset that has been repartitioned into 500 partitions and then 'cached':

df = df.na.drop().repartition(500)
df.cache()
 
df_label = df \
      .withColumn("label", F.when(df["threshold_runoff"] < runoff_amount, 1).otherwise(0)) .drop("threshold_runoff")
 
train, test = df_label.randomSplit([0.95, 0.05])

Here is a screenshot of one of the jobs:

Job screenshot

Job Summary:

Job summary

Top half of storage tab:

Spark storage

How would you go about diagnosing this issue? Any tips for performance improvements here?

I have tried increasing and decreasing the number of partitions but for a different algorithm (Logistic Regression) and 500 partitions seemed to worked quite well on the same dataset. I appreciate your input!



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source