'convert df.apply to spark to run parallely iusing all the cores

We have a panda dataframe that are using. We have a function we use in retail data which runs on a daily basis row by row to calculate the item to item difference like below

for itemiter in range(len(RetDf)):
    column = RetDf.loc[itemiter , "username"]
    RetDf[column] = RetDf.apply(lambda row: ItemDiff(RetDf.loc[itemiter, 'Val'], row['Val']), axis=1)

is there a way to convert it to sparkcontext rdd parallelize to use all cores

Sample Data with dummy values for retdf: 

username    UserId     Val
abc75757    1234       [0.0 , 0.0, 1.0, 2.0] 
abcraju     4567       [0.0 , 0.0, 1.0, 2.0]
xyzuser     4343       [0.0 , 0.0, 1.0, 2.0]
user4abc    2323       [0.0 , 0.0, 1.0, 2.0]


FinalOutput: 

username    UserId     Val                     abc75757  abcraju        xyzuser     user4abc
abc75757    1234       [0.0 , 0.0, 1.0, 2.0]   2.0       0.0            0.0         1.0
abcraju     4567       [0.0 , 0.0, 1.0, 2.0]   2.0       0.0            0.0         1.0
xyzuser     4343       [0.0 , 0.0, 1.0, 2.0]   2.0       0.0            0.0         1.0
user4abc    2323       [0.0 , 0.0, 1.0, 2.0]   2.0       0.0            4.0         1.0

ItemDiff

def ItemDiff(z1,z2):
    distance_t = 0.0
    path_t = [(0,0)]
    distance_t, path_t = fastdtw(z1,z2)
    return(distance_t)


Solution 1:[1]

You've turned a combinations problem in to a product problem, more than doubling the necessary calculations, I'm not sure of a good pure pandas way of doing this... but this should still be much faster even without parallelization.

a = [["abc75757", 1234, [4.0, 0.0, 1.0, 4.0]],
     ["abcraju", 4567, [0.0, 0.0, 3.0, 2.0]],
     ["xyzuser", 4343, [0.0, 1.0, 1.0, 2.0]],
     ["user4abc", 2323, [0.0, 0.0, 1.0, 3.0]]]
RetDf = pd.DataFrame(a, columns=['username', 'UserId', 'Val'])
from itertools import combinations

combos = combinations(RetDf[['username', 'Val']].to_numpy(), r=2)
combos = [(x[0][0], x[1][0], fastdtw(x[0][1], x[1][1])[0]) for x in combos]
permuts = [(x[0], x[1], x[2]) for x in combos] + [(x[1], x[0], x[2]) for x in combos]

df = pd.DataFrame(permuts, columns=['username', 'pair', 'value']).pivot(index='username', columns='pair').droplevel(0, axis=1).reset_index()
output = RetDf.merge(df).fillna(0)
print(output)

Output:

    username  UserId                   Val  abc75757  abcraju  user4abc  xyzuser                                                                 
0   abc75757    1234  [4.0, 0.0, 1.0, 4.0]       0.0      8.0       5.0      6.0
1   abcraju     4567  [0.0, 0.0, 3.0, 2.0]       8.0      0.0       2.0      3.0
2   xyzuser     4343  [0.0, 1.0, 1.0, 2.0]       6.0      3.0       1.0      0.0
3   user4abc    2323  [0.0, 0.0, 1.0, 3.0]       5.0      2.0       0.0      1.0

Solution 2:[2]

There is Pandas on Spark which should look into but here it won't help as they don't have a direct translation for pd.loc

It seems your doing a cartesian join which is expensive but this is what I suggest you do:

from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import udf
import pyspark.sql.functions as f
from pyspark.sql.types import *
from fastdtw import fastdtw

#Create PySpark SparkSession
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()
#Create PySpark DataFrame from Pandas
raw_data = { 'username' :[ 'abc75757', 'abcraju', 'xyzuser', 'user4abc'], 'UserId':[ 1234, 4567,4343,2323], 'Val': [[0.0 , 0.0, 1.0, 2.0] ,[0.0 , 0.0, 1.0, 2.0],[0.0 , 0.0, 1.0, 2.0],[0.0 , 0.0, 1.0, 2.0]]}
RetDf = pd.DataFrame(raw_data)

def ItemDiff(z1,z2):
 distance_t = 0.0
 path_t = [(0,0)]
 distance_t, path_t = fastdtw(z1,z2)
 return float(distance_t)

itemDiff = udf(ItemDiff, FloatType()) # create UDF to do work.


sparkDF=spark.createDataFrame(RetDf) 
cartesianJoin = sparkDF.crossJoin(sparkDF)\ # this is expensive but necessary
.toDF("UserId","Val","username","myUserId","myVal","my_username")\ #renaming the columns for convience
.select( itemDiff( "Val", "myVal" ).alias("dist"), f.col("*") )\ # run UDF 
.groupBy( "Val","UserId","username" )\ # this enables us to pivot 
.pivot("my_username")\ #exposes the calculation  be careful to uses the 'exploded' column.  
.max("dist").show() #handy tick as their is only 1 value so max is just the number.
+--------------------+------+--------+--------+-------+--------+-------+        
|                 Val|UserId|username|abc75757|abcraju|user4abc|xyzuser|
+--------------------+------+--------+--------+-------+--------+-------+
|[0.0, 0.0, 1.0, 2.0]|  1234|abc75757|     0.0|    0.0|     0.0|    0.0|
|[0.0, 0.0, 1.0, 2.0]|  4343| xyzuser|     0.0|    0.0|     0.0|    0.0|
|[0.0, 0.0, 1.0, 2.0]|  4567| abcraju|     0.0|    0.0|     0.0|    0.0|
|[0.0, 0.0, 1.0, 2.0]|  2323|user4abc|     0.0|    0.0|     0.0|    0.0|
+--------------------+------+--------+--------+-------+--------+-------+

UDF documentation here. If you can rework your logic to avoid using a UDF, it would run faster, but you'd need to learn what spark sql functions you could use to do the same things. This might be a good time to review if you really need all the data to calculate the columns or if you can simplify your logic.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2