'Dot product between two Vector columns of a DataFrame

I do have this situation, and I'm stuck, and looking for guidance please (I do see a lot of the limitations when performing linear algebra operations on Spark and one is distributed scientific computing like scipy and numpy at scale, serialization and deserialization), did thought of the joining this 2 column and perform a combination of columns and took a look of this approach, but index of the vector in vector column is very important for me and I also did look at this udf for dot product for dataframe columns but is performing elements in to row and not all combinations from col1 with col2:

"Looking to solve 2 product between 2 SparseVectors columns, one SparseVectors Column is from df1 and another is SparseVectors column from df2 with preserving the index of each vector". As u already know this is sole for big data, millions and billions vectors and collecting and using simple numpy and scipy is not a solution for me at this moment and only after filtering to have a small data.

Here is a sample of my data, each vector length is the same but the column (amount of vectors for each df are different :

> df1:

col1
|(11128,[0,1,2,3,5...|
|(11128,[11,22,98,...|
|(11128,[51,90,218...|

> df2:

col1
|(11128,[21,23,24,...|
|(11128,[0,1,2,3,5...|
|(11128,[0,1,2,3,4...|
|(11128,[28,59,62,...|
...

Adding more info for vectors part, maybe modifying .withColumn() to use .map() function to do all vectors in parallel at once, since does have index? I do know is not the best approach but is all I can think of it right now (this is not related solve for .dot() product but more for the UDF/pandas_udf to extend math operations at Vectors level:

I do bring all into rdd with index, is a way for me to modify the approach to make index as col name ?

[[0, SparseVector(11128, {0: 0.0485, 1: 0.0515, 2: 0.0535, 3: 0.0536, 5: 0.0558, 6: 0.0545, 7: 0.0558, 59: 0.1108, 62: 0.1114, 65: 0.1123, 68: 0.1126, 70: 0.113, 82: 0.121, 120: 0.1414, 149: 0.149, 189: 0.1685, 271: 0.1876, 275: 0.1891, 303: 0.1919, 478: 0.2193, 634: 0.2359, 646: 0.2383, 1017: 0.2626, 1667: 0.2943, 1821: 0.3006, 2069: 0.3095, 2313: 0.3191, 3104: 0.347})], 
    [1, SparseVector(11128, {11: 0.0621, 22: 0.0776, 98: 0.1167, 210: 0.155, 357: 0.1811, 360: 0.1818, 466: 0.1965, 475: 0.1962, 510: 0.2005, 532: 0.2033, 597: 0.2092, 732: 0.2178, 764: 0.2198, 1274: 0.2489, 1351: 0.2519, 1353: 0.2522, 1451: 0.2562, 1577: 0.2608, 2231: 0.2841, 2643: 0.2969, 3107: 0.3114})]]

So I did try approach with UDF but so far I can get with static vector (I convert to rdd and take each vector individually but is not the best approach for me, I want to do all at once and in parallel so map and keep the index for each vector in place when doing it):

from pyspark.mllib.linalg import *
# write our UDF for .dot product
def dot_prod(a,b):
    return a.dot(b)

# apply the UDF to the column
df = df.withColumn("dotProd", udf(dot_prod, FloatType())(col("col2"), array([lit(v) for v in static_array])))


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source