'Merge two dataframes with conditions in pyspark

Df1:
+-------------------+
|               Date|
+-------------------+
|2020-07-01 00:00:00|
|2020-07-02 00:00:00|
|2020-07-03 00:00:00|
|2020-07-04 00:00:00|
|2020-07-05 00:00:00|
|2020-07-06 00:00:00|
|2020-07-07 00:00:00|
|2020-07-08 00:00:00|
|2020-07-09 00:00:00|
|2020-07-10 00:00:00|
+-------------------+
Df2:
+-------------------+----------+--------+--------+
|               Date|        ID|     Val|   f_val|
+-------------------+----------+--------+--------+
|2022-03-19 00:00:00|        34|     0.0|     0.0|
|2022-03-19 00:00:00|       108|     0.0|     0.0|
|2022-03-19 00:00:00|       155| 3070.61| 3070.61|
|2022-03-19 00:00:00|       193|22920.73|22920.73|
|2022-03-19 00:00:00|       211|   446.0|   446.0|
|2022-03-19 00:00:00|       321| 9314.16| 9314.16|
|2022-03-19 00:00:00|       362|  391.01|  391.01|
|2022-03-19 00:00:00|       368|    1.81|    1.81|
|2022-03-19 00:00:00|       375|    5.08|    5.08|
|2022-03-19 00:00:00|       530| 2444.76| 2444.76|
+-------------------+----------+--------+--------+

Both the DataFrames have Date starting at 2022-03-19 and ending at 2020-07-01. Df1 contains a series of unique values from end to start date. However, Df2 is rather very large dataset with same dates over multiple rows till 2020-07-01. Date of Df2 has only 186 distinct values, and Date of Df1 has 626. With PySpark in Databricks, what I am trying to achieve here is, merge Date columns of Df1 and Df2 including all the missing dates in Df2, but Df2 should fill the newly acquired rows with the previous row values.

Here is a sample of data when filtered over ID:

+-------------------+----------+--------+--------+
|               Date|        ID|     Val|   f_val|
+-------------------+----------+--------+--------+
|2022-03-11 00:00:00|   Ax3838J|    -8.0|81111.73|
|2022-03-07 00:00:00|   Ax3838J|   17.94|81129.67|
|2022-02-27 00:00:00|   Ax3838J|    20.0|81149.67|
|2021-01-25 00:00:00|   Ax3838J|    40.0|81189.67|
|2021-10-22 00:00:00|   Ax3838J|   89.06|81278.73|
|2021-10-18 00:00:00|   Ax3838J|   10.89|81289.62|
|2021-10-15 00:00:00|   Ax3838J|    60.0|81349.62|
|2021-09-22 00:00:00|   Ax3838J|  -250.0|81099.62|
+-------------------+----------+--------+--------+

And final expectation is:

+-------------------+----------+--------+--------+
|               Date|        ID|     Val|   f_val|
+-------------------+----------+--------+--------+
|2022-03-11 00:00:00|   Ax3838J|    -8.0|81111.73|
|2022-03-10 00:00:00|   Ax3838J|    -8.0|81111.73|
|2022-03-09 00:00:00|   Ax3838J|    -8.0|81111.73|
|2022-03-08 00:00:00|   Ax3838J|    -8.0|81111.73|
|2022-03-07 00:00:00|   Ax3838J|   17.94|81129.67|
|2022-03-06 00:00:00|   Ax3838J|   17.94|81129.67|
|2022-03-05 00:00:00|   Ax3838J|   17.94|81129.67|
|2022-03-04 00:00:00|   Ax3838J|   17.94|81129.67|
|2022-03-03 00:00:00|   Ax3838J|   17.94|81129.67|
|2022-03-02 00:00:00|   Ax3838J|   17.94|81129.67|
       .                   .         .       .
       .                   .         .       .
+-------------------+----------+--------+--------+


Solution 1:[1]

from pyspark.sql import Window
from pyspark.sql.functions import last
import sys

# first join the dataframes with left join. Keep df1 at the left side as it contains data for all the dates.

joined_df = df1.join(df2, df1.date ===  df2.date, "left")

# define the window
window = Window.orderBy('date')\
               .rowsBetween(-sys.maxsize, 0)

# define the fill forward columns
filled_column_id = last(joined_df['ID'], ignorenulls=True).over(window)
filled_column_Val = last(joined_df['Val'], ignorenulls=True).over(window)
filled_column_fval = last(joined_df['f_val'], ignorenulls=True).over(window)

# replace the existing columns with filled columns
joined_filled_df = joined_df.withColumn('ID',  filled_column_id).withColumn('Val',  filled_column_Val).withColumn('f_val',  filled_column_fval)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 supratim saha