'Pyspark performance tunning - cache or not to cache?

I am trying to speed up the calculations from multiple operations that I am adding as columns in a pyspark data frame, when I found the sparkbyexamples article on performance tunning. I am considering how to use the cache and the spark.sql.shuffle.partitions, solutions.

  1. Would cache be appropriate for a code that first joins multiple data frames and then adds calculations over different windows?
  2. What happens when reassigning the cached data frame (see bellow)?

Example:

df = dfA.join(dfB, on = ['key'], how ='left') # should I add .cache here?

w_u = Window.partitionBy('user')
w_m = Window.partitionBy(['user','month']).orderBy('month')\
             .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

MLAB = ['val1','val2'] # example to indicate that I run similar operations multiple times 
for mlab in MLAB:
    percent_50 = F.expr('percentile_approx('+mlab+',0.5)')
    df = df.withColumn(mlab+'_md', percent_50.over(w_u) # what happens with the cache when I reassing it

Afterwards I am adding additional operations that include aggregations, such as:

radius_df = (df
             # number of visits per stop
             .groupby('userId', 'locationId').agg(F.count(F.lit(1)).alias('n_i'), 
                                                  F.first('locationLongitude').alias('locationLongitude'),
                                                  F.first('locationLatitude').alias('locationLatitude'))
             #compute center of mass (lat/lon) per user
             .withColumn('center_lon', F.avg(F.col('locationLongitude')).over(w))
             .withColumn('center_lat', F.avg(F.col('locationLatitude')).over(w))
             # compute total visits
             .withColumn('N', F.sum(F.col('n_i')).over(w))    
             # compute (r_i - r_cm)
             .withColumn('distance', distance(F.col('locationLatitude'), F.col('locationLongitude'), F.col('center_lat'), F.col('center_lon')))
             # compute n_i(r_i - r_cm)^2 / N
             .withColumn('distance2', F.col('n_i') * (F.col('distance') * F.col('distance')) / F.col('N'))
             # compute sum(n_i(r_i - r_cm)^2)
             .groupBy('userId').agg(F.sum(F.col('distance2')).alias('sum_dist2'))
             # square root
             .withColumn('radius_gyr', F.sqrt(F.col('sum_dist2')))
             .select('userId','radius_gyr')
            )

df_f = df.join(radius_df.dropDuplicates(), on='userId', how='left')

I am open to any suggestions on how to speed up the code. Many thanks.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source