'Pyspark performance tunning - cache or not to cache?
I am trying to speed up the calculations from multiple operations that I am adding as columns in a pyspark data frame, when I found the sparkbyexamples article on performance tunning. I am considering how to use the cache and the spark.sql.shuffle.partitions, solutions.
- Would cache be appropriate for a code that first joins multiple data frames and then adds calculations over different windows?
- What happens when reassigning the cached data frame (see bellow)?
Example:
df = dfA.join(dfB, on = ['key'], how ='left') # should I add .cache here?
w_u = Window.partitionBy('user')
w_m = Window.partitionBy(['user','month']).orderBy('month')\
.rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
MLAB = ['val1','val2'] # example to indicate that I run similar operations multiple times
for mlab in MLAB:
percent_50 = F.expr('percentile_approx('+mlab+',0.5)')
df = df.withColumn(mlab+'_md', percent_50.over(w_u) # what happens with the cache when I reassing it
Afterwards I am adding additional operations that include aggregations, such as:
radius_df = (df
# number of visits per stop
.groupby('userId', 'locationId').agg(F.count(F.lit(1)).alias('n_i'),
F.first('locationLongitude').alias('locationLongitude'),
F.first('locationLatitude').alias('locationLatitude'))
#compute center of mass (lat/lon) per user
.withColumn('center_lon', F.avg(F.col('locationLongitude')).over(w))
.withColumn('center_lat', F.avg(F.col('locationLatitude')).over(w))
# compute total visits
.withColumn('N', F.sum(F.col('n_i')).over(w))
# compute (r_i - r_cm)
.withColumn('distance', distance(F.col('locationLatitude'), F.col('locationLongitude'), F.col('center_lat'), F.col('center_lon')))
# compute n_i(r_i - r_cm)^2 / N
.withColumn('distance2', F.col('n_i') * (F.col('distance') * F.col('distance')) / F.col('N'))
# compute sum(n_i(r_i - r_cm)^2)
.groupBy('userId').agg(F.sum(F.col('distance2')).alias('sum_dist2'))
# square root
.withColumn('radius_gyr', F.sqrt(F.col('sum_dist2')))
.select('userId','radius_gyr')
)
df_f = df.join(radius_df.dropDuplicates(), on='userId', how='left')
I am open to any suggestions on how to speed up the code. Many thanks.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|