'Merge two dataframes with conditions in pyspark
Df1:
+-------------------+
| Date|
+-------------------+
|2020-07-01 00:00:00|
|2020-07-02 00:00:00|
|2020-07-03 00:00:00|
|2020-07-04 00:00:00|
|2020-07-05 00:00:00|
|2020-07-06 00:00:00|
|2020-07-07 00:00:00|
|2020-07-08 00:00:00|
|2020-07-09 00:00:00|
|2020-07-10 00:00:00|
+-------------------+
Df2:
+-------------------+----------+--------+--------+
| Date| ID| Val| f_val|
+-------------------+----------+--------+--------+
|2022-03-19 00:00:00| 34| 0.0| 0.0|
|2022-03-19 00:00:00| 108| 0.0| 0.0|
|2022-03-19 00:00:00| 155| 3070.61| 3070.61|
|2022-03-19 00:00:00| 193|22920.73|22920.73|
|2022-03-19 00:00:00| 211| 446.0| 446.0|
|2022-03-19 00:00:00| 321| 9314.16| 9314.16|
|2022-03-19 00:00:00| 362| 391.01| 391.01|
|2022-03-19 00:00:00| 368| 1.81| 1.81|
|2022-03-19 00:00:00| 375| 5.08| 5.08|
|2022-03-19 00:00:00| 530| 2444.76| 2444.76|
+-------------------+----------+--------+--------+
Both the DataFrames have Date
starting at 2022-03-19 and ending at 2020-07-01. Df1
contains a series of unique values from end to start date. However, Df2
is rather very large dataset with same dates over multiple rows till 2020-07-01. Date
of Df2
has only 186 distinct values, and Date
of Df1
has 626.
With PySpark in Databricks, what I am trying to achieve here is, merge Date
columns of Df1
and Df2
including all the missing dates in Df2
, but Df2
should fill the newly acquired rows with the previous row values.
Here is a sample of data when filtered over ID
:
+-------------------+----------+--------+--------+
| Date| ID| Val| f_val|
+-------------------+----------+--------+--------+
|2022-03-11 00:00:00| Ax3838J| -8.0|81111.73|
|2022-03-07 00:00:00| Ax3838J| 17.94|81129.67|
|2022-02-27 00:00:00| Ax3838J| 20.0|81149.67|
|2021-01-25 00:00:00| Ax3838J| 40.0|81189.67|
|2021-10-22 00:00:00| Ax3838J| 89.06|81278.73|
|2021-10-18 00:00:00| Ax3838J| 10.89|81289.62|
|2021-10-15 00:00:00| Ax3838J| 60.0|81349.62|
|2021-09-22 00:00:00| Ax3838J| -250.0|81099.62|
+-------------------+----------+--------+--------+
And final expectation is:
+-------------------+----------+--------+--------+
| Date| ID| Val| f_val|
+-------------------+----------+--------+--------+
|2022-03-11 00:00:00| Ax3838J| -8.0|81111.73|
|2022-03-10 00:00:00| Ax3838J| -8.0|81111.73|
|2022-03-09 00:00:00| Ax3838J| -8.0|81111.73|
|2022-03-08 00:00:00| Ax3838J| -8.0|81111.73|
|2022-03-07 00:00:00| Ax3838J| 17.94|81129.67|
|2022-03-06 00:00:00| Ax3838J| 17.94|81129.67|
|2022-03-05 00:00:00| Ax3838J| 17.94|81129.67|
|2022-03-04 00:00:00| Ax3838J| 17.94|81129.67|
|2022-03-03 00:00:00| Ax3838J| 17.94|81129.67|
|2022-03-02 00:00:00| Ax3838J| 17.94|81129.67|
. . . .
. . . .
+-------------------+----------+--------+--------+
Solution 1:[1]
from pyspark.sql import Window
from pyspark.sql.functions import last
import sys
# first join the dataframes with left join. Keep df1 at the left side as it contains data for all the dates.
joined_df = df1.join(df2, df1.date === df2.date, "left")
# define the window
window = Window.orderBy('date')\
.rowsBetween(-sys.maxsize, 0)
# define the fill forward columns
filled_column_id = last(joined_df['ID'], ignorenulls=True).over(window)
filled_column_Val = last(joined_df['Val'], ignorenulls=True).over(window)
filled_column_fval = last(joined_df['f_val'], ignorenulls=True).over(window)
# replace the existing columns with filled columns
joined_filled_df = joined_df.withColumn('ID', filled_column_id).withColumn('Val', filled_column_Val).withColumn('f_val', filled_column_fval)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | supratim saha |