'Pyspark how to compare row by row based on hash from two data frame and group the result

I have bellow two data frame with hash added as additional column to identify differences for same id from both data frame

df1=
name | department| state | id|hash
-----+-----------+-------+---+---
James|Sales      |NY     |101| c123
Maria|Finance    |CA     |102| d234
Jen  |Marketing  |NY     |103| df34



df2=
name | department| state | id|hash
-----+-----------+-------+---+----
James|  Sales1   |null   |101|4df2
Maria|  Finance  |       |102|5rfg
Jen  |           |NY2    |103|234

#identify unmatched row for same id from both data frame

df1_un_match_indf2=df1.join(df2,df1.hash==df2.hash,"leftanti")

df2_un_match_indf1=df2.join(df1,df2.hash==df1.hash,"leftanti")

#The above case list both data frame, since all hash for same id are different

Now i am trying to find difference of row value against the same id from 'df1_un_match_indf1,df2_un_match_indf1' data frame, so that it shows differences row by row

df3=df1_un_match_indf1
df4=df2_un_match_indf1
common_diff=df3.join(df4,df3.id==df4.id,"inner")
common_dff.show()

but result show difference like this

+--------+----------+-----+----+-----+-----------+-------+---+---+----+
|name    |department|state|id  |hash |name | department|state| id|hash
+--------+----------+-----+----+-----+-----+-----------+-----+---+-----+
|James   |Sales     |NY   |101 | c123|James|  Sales1   |null |101| 4df2 
|Maria   |Finance   |CA   |102 | d234|Maria|  Finance  |     |102| 5rfg
|Jen     |Marketing |NY   |103 | df34|Jen  |           |NY2  |103| 2f34

What i am expecting is

+-----------------------------------------------------------+-----+--------------+
|name            | department          | state      | id          | hash
['James','James']|['Sales','Sales']    |['NY',null] |['101','101']|['c123','4df2']
['Maria','Maria']|['Finance','Finance']|['CA','']   |['102','102']|['d234','5rfg']
['Jen','Jen']    |['Marketing','']     |['NY','NY2']|['102','103']|['df34','2f34']

I tried with different ways, but didn't find right solution to make this expected format

Can anyone give a solution or idea to this? Thanks



Solution 1:[1]

What you want to use is likely collect_list or maybe 'collect_set'

This is really well described here:

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

sc = SparkContext("local")

sqlContext = HiveContext(sc)

df = sqlContext.createDataFrame([
    ("a", None, None),
    ("a", "code1", None),
    ("a", "code2", "name2"),
], ["id", "code", "name"])

df.show()

+---+-----+-----+
| id| code| name|
+---+-----+-----+
|  a| null| null|
|  a|code1| null|
|  a|code2|name2|
+---+-----+-----+

(df
  .groupby("id")
  .agg(F.collect_set("code"),
       F.collect_list("name"))
  .show())

+---+-----------------+------------------+
| id|collect_set(code)|collect_list(name)|
+---+-----------------+------------------+
|  a|   [code1, code2]|           [name2]|
+---+-----------------+------------------+

In your case you need to slightly change your join into a union to enable you to group the data.

df3=df1_un_match_indf1
df4=df2_un_match_indf1
common_diff = df3.union(df4)
(common_diff
  .groupby("id")
  .agg(F.collect_set("name"),
       F.collect_list("department"))
  .show())

If you can do a union just use an array:

from pyspark.sql.functions import array
common_diff.select(
  df.id,
  array( 
    common_diff.thisState, 
    common_diff.thatState
  ).alias("State"),
  array( 
    common_diff.thisDept, 
    common_diff.thatDept
  ).alias("Department")
)

It a lot more typing and a little more fragile. I suggest that renaming columns and using the groupby is likely cleaner and clearer.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1