'merge rows keeping latest values for each couple (element, timestamp)
I have a PySpark dataframe with ids, values ant their related timestamps, here a sample of it:
+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value| value_ts|value2| value2_ts|value3| value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
| 1| 0.5|2022-03-15 00:00:00| null| null| 7|2022-03-15 00:00:00|
| 2| 0.2|2022-03-18 00:00:00| null| null| 5|2022-03-18 00:00:00|
| 3| null| null| null| null| 12|2022-03-15 00:00:00|
| 1| 1.2|2022-03-18 00:00:00| null| null| null| null|
| 1| null| null| 124|2022-03-10 00:00:00| 6|2022-03-10 00:00:00|
| 3| null| null| 413|2022-03-18 00:00:00| null| null|
+---+-----+-------------------+------+-------------------+------+-------------------+
for this data I would like to obtain, for each couple value-value_ts, the latest values. grouped by the id. in this example we have:
- id=1:
- latest value_ts is 2022-03-18 00:00:00, the corresponding value is 1.2
- latest value2_ts is 2021-03-10 00:00:00, the corresponding value2 is 124
- latest value3_ts is 2022-03-15 00:00:00, the corresponding value3 is 7
- id=2:
- latest value_ts is 2022-03-18 00:00:00, the corresponding value is 0.2
- latest value2_ts is null, the corresponding value2 is null
- latest value3_ts is 2022-03-18 00:00:00, the corresponding value3 is 5
- id=3:
- latest value_ts is null, the corresponding value is null
- latest value2_ts is 2022-03-18 00:00:00, the corresponding value2 is 413
- latest value3_ts is 2022-03-15 00:00:00, the corresponding value3 is 12
since I have 3 distinct id in the input, I expect 3 rows in output, like this:
+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value| value_ts|value2| value2_ts|value3| value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
| 1| 1.2|2022-03-18 00:00:00| 124|2022-03-10 00:00:00| 7|2022-03-15 00:00:00|
| 2| 0.2|2022-03-18 00:00:00| null| null| 5|2022-03-18 00:00:00|
| 3| null| null| 413|2022-03-18 00:00:00| 12|2022-03-15 00:00:00|
+---+-----+-------------------+------+-------------------+------+-------------------+
can you please help me obtain this result using pyspark?
note that if the ts is null the corresponding value is null as well, here python code to reproduce the input dataframe:
from datetime import datetime
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("local_test").getOrCreate()
df = spark.createDataFrame([
["1", "0.5", datetime(2022, 3, 15), None, None, "7", datetime(2022, 3, 15)],
["2", "0.2", datetime(2022, 3, 18), None, None, "5", datetime(2022, 3, 18)],
["3", None, None, None, None, "12", datetime(2022, 3, 15)],
["1", "1.2", datetime(2022, 3, 18), None, None, None, None],
["1", None, None, "124", datetime(2022, 3, 10), "6", datetime(2022, 3, 10)],
["3", None, None, "413", datetime(2022, 3, 18), None, None],
],
["id", "value", "value_ts", "value2", "value2_ts", "value3", "value3_ts"]
)
Solution 1:[1]
This can be solved by:
- Identifying the
latest tsfor eachvalueandtscolumn combination. - Copying the values identified in step 1 to all rows belonging to the same
id. - Finally, de-duplicated by finding the first row in each group.
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql import DataFrame
def latest_ts(idf: DataFrame, val_col_name: str, ts_col_name: str) -> DataFrame:
ws = W.partitionBy("id").orderBy(F.desc(ts_col_name)).rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
# Find the latest ts
latest_ts = F.max(ts_col_name).over(ws)
# Keep value corresponding to the latest ts and make others null
latest_val = F.when(F.col(ts_col_name) == latest_ts, F.col(val_col_name)).otherwise(F.lit(None))
# Override all values of the value column and ts column to contain the latest values.
return idf.withColumn(val_col_name, F.first(latest_val, ignorenulls=True).over(ws)).withColumn(ts_col_name, latest_ts)
df_latest_ts = latest_ts(latest_ts(latest_ts(df, "value", "value_ts"), "value2", "value2_ts"), "value3", "value3_ts")
ws_rn = W.partitionBy("id").orderBy(F.desc("value_ts"), F.desc("value2_ts"), F.desc("value3_ts"))
(df_latest_ts.withColumn("rn", F.row_number().over(ws_rn))
.where("rn == 1")
.drop("rn")
).show()
"""
+---+-----+-------------------+------+-------------------+------+-------------------+
| id|value| value_ts|value2| value2_ts|value3| value3_ts|
+---+-----+-------------------+------+-------------------+------+-------------------+
| 1| 1.2|2022-03-18 00:00:00| 124|2022-03-10 00:00:00| 7|2022-03-15 00:00:00|
| 2| 0.2|2022-03-18 00:00:00| null| null| 5|2022-03-18 00:00:00|
| 3| null| null| 413|2022-03-18 00:00:00| 12|2022-03-15 00:00:00|
+---+-----+-------------------+------+-------------------+------+-------------------+
"""
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Nithish |
