'Division Operation of Column Values in Spark Dataframe based on Conditions

I have a dataframe with schema as shown below. There are around 300 signals so that number of columns are 300+

+---+------------+---------+--------+---------+--------+...........................+---------+--------+
| id|readout_date| signal01|signal02| signal03|signal04| ......................... |signalX  |signalY |
+---+------------+---------+--------+---------+--------+...........................+---------+--------+

I have a requirement to perform division on these column values and obtain new columns.

Since there are around 50+ new columns required to derive, I have listed out the dividend and divisor column names in a csv file as shown below.

new_col_name,dividend_col,divisor_col
new_col01,signal01,signalX
new_col02,signal02,signalX
new_col03,signal03,signalY
new_col04,signal04,signalY
.
.
.
.
.
.
new_col50,signal50,signalY

I am using below code to perform the division activity and it is working fine.

    val csvDF = spark.read.option("header","true").csv(DerivedSignalCsv)

    var formulaList: List[(String,String,String)] = List()
    csvDF.collect().foreach 
    {   row =>
          formulaList = formulaList:+(row.getString(0),row.getString(1),row.getString(2))
    }


    val w2 =  Window.partitionBy("id").orderBy("readout_date")
    val finalSignalDf = formulaList.foldLeft(signalDf){ 
      (tempdf, cols) => tempdf
        .withColumn(cols._1,col(cols._2)/col(cols._3))
    }

And the output for one of the derived column is as shown below.

+---+------------+---------+-------+------------------+
| id|readout_date| dividend|divisor|           new_col|
+---+------------+---------+-------+------------------+
|454|  2020-01-09|      0.0|    0.0|              null|
|454|  2020-01-10|  45177.0|   55.0|             821.4|
|454|  2020-01-27|  33613.0|   39.0| 861.8717948717949|
|454|  2020-01-28|  27514.0|   32.0|          859.8125|
|454|  2020-02-11|     null|  116.0|              null|
|454|  2020-02-12|     null|   42.0|              null|
|454|  2020-02-13|     null|   32.0|              null|
|454|  2020-02-14|     null|   35.0|              null|
|454|  2020-02-15| 656256.0|   53.0|12382.188679245282|
|454|  2020-02-17|  13354.0|   16.0|           834.625|
|454|  2020-02-18|  69069.0|   85.0| 812.5764705882353|
|454|  2020-04-02|      0.0|    0.0|              null|
|454|  2020-04-03|  73831.0|   87.0| 848.6321839080459|
|454|  2020-04-04|  52910.0|   69.0| 766.8115942028985|
|454|  2020-07-02|     null|  105.0|              null|
|454|  2020-07-04| 117870.0|   40.0|           2946.75|
|454|  2020-07-08|     null|  173.0|              null|
|454|  2020-07-10| 284248.0|  198.0|1435.5959595959596|
|454|  2020-07-11|  93828.0|  127.0| 738.8031496062993|
|454|  2020-07-27|  12532.0|   16.0|            783.25|
|454|  2020-07-31|     null|  130.0|              null|
|454|  2020-08-03| 141846.0|   53.0|2676.3396226415093|
|454|  2020-08-10| 306715.0|  390.0| 786.4487179487179|
|454|  2020-11-04|     null|  134.0|              null|
|454|  2020-11-07|     null|   79.0|              null|
|454|  2020-11-09|     null|   92.0|              null|
|454|  2020-11-10|     null|   26.0|              null|
|454|  2020-11-11|     null|   75.0|              null|
|454|  2020-11-21|3407859.0|  163.0|20907.110429447854|
|454|  2020-11-23|  41897.0|   55.0| 761.7636363636364|
|454|  2022-01-27| 353874.0|  447.0| 791.6644295302013|
+---+------------+---------+-------+------------------+

If we look at the result, the rows next to a null dividend or divisor is having huge value which is not expected. highlighted in the below image.

enter image description here

The solution to make these higher value to normal range is implemented using the below formula.

As shown in the below diagram, if previous dividend value is null then the divisor value should be sum of above values corresponding to null dividend records.

For Example;

For the date 2020-02-15  new_col = 656256/(116+42+32+35+53)
For the date 2020-07-04  new_col = 117870/(105+40) 

enter image description here

If there is only one null value prior to the record (For the dates 2020-07-04, 2020-08-03 in above image), then it is working fine with the below update on the code.

    val w2 =  Window.partitionBy("fin").orderBy("readout_date")
    val finalSignalDiffDf = formulaList.foldLeft(signalDiffDf){ 
      (tempdf, cols) => tempdf
        .withColumn(cols._1,when((lag(col(cols._2),1).over(w2)).isNull,col(cols._2)/(col(cols._3)+lag(col(cols._3),1).over(w2)))
                    .when((lag(col(cols._3),1).over(w2)).isNull,(col(cols._2)+lag(col(cols._2),1).over(w2))/col(cols._3))
                    .otherwise(col(cols._2)/col(cols._3)))
    }

But this is not working for records having multiple null records prior to that (For the dates 2020-02-15, 2020-11-21 in the above image).

Any idea how to implement this logic if there are multiple null values preceding. Any leads Appreciated!



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source