'Dropping duplicate spark data frame rows based on the first occurrence of schema values
If I'm working with a table like so as a Spark dataframe:
| received | userId |
|---|---|
| 2022-01-07 06:23:02 | se23289 |
| 2022-01-03 22:21:33 | se23289 |
| 2022-01-16 18:01:45 | se12355 |
| 2022-01-11 02:35:23 | se23289 |
| 2022-01-13 05:24:21 | se12355 |
How would I go about dropping duplicates or repeated row occurrences of userId based upon the first or earliest date they are seen in the table? Been struggling with this for a bit.
New to spark. I understand i can do drop.where(), but that requires a specific comparison. Where this is not that simple. Any tips much appreciated.
Solution 1:[1]
I am not actually an expert in Spark data frames. From what I know, you can change it to pandas and then you can apply duplicated function:
pDataframe = sparkDf.toPandas()
pDataframe[~pDataframe["userId"].duplicated(keep="first")]
Solution 2:[2]
Do you want to achieve this effect?
| received | userId |
|---|---|
| 2022-01-07 06:23:02 | se23289 |
| 2022-01-03 22:21:33 | se23289 |
| 2022-01-13 05:24:21 | se12355 |
- use source.createOrReplaceTempView("source")
- then spark.sql("sql")
Code there:
SELECT p1.received,p1.userId FROM (SELECT received, userId,ROW_NUMBER() OVER(PARTITION BY userId ORDER BY DATE(received) DESC) AS lineNum FROM source) p1 WHERE p1.lineNum != 1
Solution 3:[3]
You can use a window to rank the dates in ascending or descending order based on whether you want to keep the first or last of the duplicate rows. To achieve this you can use a helper function like below:
from pyspark.sql import Window as W, functions as F
def mydeduplicate(df,keep='first'):
if keep=='first':
col = "received"
elif keep == 'last':
col = F.desc("received")
return df.withColumn("Rnk",F.dense_rank().over(W.partitionBy("userId")\
.orderBy(col))).filter("Rnk==1").drop("Rnk")
Sample Runs:
mydeduplicate(spark_df).show()
+-------------------+-------+
| received| userId|
+-------------------+-------+
|2022-01-13 05:24:21|se12355|
|2022-01-03 22:21:33|se23289|
+-------------------+-------+
mydeduplicate(spark_df,keep='last').show()
+-------------------+-------+
| received| userId|
+-------------------+-------+
|2022-01-16 18:01:45|se12355|
|2022-01-11 02:35:23|se23289|
+-------------------+-------+
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | Diiiiza |
| Solution 3 |
