'Efficient off-diagonal cogroup-like operations in PySpark

BACKGROUND: PySpark has a feature called Pandas CoGrouped Operations, which takes two DataFrames, dfA and dfB, groups them both by the same key column (e.g. "username"), performs an arbitrary python/pandas operation on the corresponding pairs of partitions from dfA and dfB, and (abstractly) unions the result. For example, if dfA has username partitions "Alice", "Bob", and "Carlos", and dfB has username partitions "Alice", "Bob" and "Carlos", and f(left, right) is an arbitrary function on two pandas data frames returning a pandas dataframe with a fixed schema, then this can be "lifted" to an operation on dfA.groupBy("username") and dfB.groupBy("username") which returns the union of f(dfA|Alice, dfB|Alice), f(dfA|Bob, dfB|Bob), and f(dfA|Carlos, dfB|Carlos).

Note that CoGrouped Operations could also be achieved by doing a kind of "outer union" of dfA and dfB, grouping that unioned dataframe by "username", and then applying as a regular grouped pandas udf a modified version of the python/pandas function f which splits back up the "left" and "right" halves of each partition. However CoGrouped is more efficient because it avoids the needless unioning and splitting.

QUESTION Suppose instead of "on-diagonal" cogrouped operation where we perform on operation on groups which share the same key (e.g. "username"), we want to do an "off-diagonal" cogrouped operation, where we want to apply a function to each pairs of groups (g1, g2) from dfA.groupBy("username") and dfB.groupBy("username") whose keys are unequal. For example, the "off-diagonal" version of the above would be to take the union of:

f(dfA|Alice, dfB|Bob), f(dfA|Alice, dfB|Carlos), f(dfA|Bob, dfB|Alice), f(dfA|Bob, dfB|Carlos), f(dfA|Carlos, dfB|Alice), and f(dfA|Carlos, dfB|Bob)

A use case would be doing pairwise comparisons of data for different people, e.g. comparing Bob's test answers to Alice's test answers to see if there is a suspicious correlation.

Obviously this operation scales badly (quadratically in number of groups), but assuming we have a reasonable bound on number of groups (e.g. 100 or 1000), what is the most efficient way to do this?

ATTEMPT 1 Create a list of all pairs of groups (e.g. (Alice, Bob), (Alice, Carlos), (Bob, Alice), ...), and then apply f to each pair of groups in an explicit for-loop over these pairs. This reduces max memory usage, but doesn't take advantage of parallelism.

ATTEMPT 2 Join dfA to a "group map table" T with records ((left=Alice, right=Bob), (left=Alice, right=Carlos), (left=Bob, right=Alice), ...) on dfA.username = T.left, and likewise join dfB to T by dfB.username = T.right. Then make the outer union of these two, U, and finally do U.groupBy('left', 'right').applyInPandas(g), where g is a modified version of f which splits the records from dfA and dfB in U back into 2 dataframes before applying the operation.

Is there a more efficient solution than attempt 2? I'm worried that the outer union will obliterate any chance of spark optimizing this query.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source