'spark scala percentile_approx with weights

How can I compute percentile 15th and percentile 50th of column students taking into consideration occ column without using array_repeat and avoiding explosion? I have huge input dataframe and explosion blows out the memory.

My DF is:

name | occ | students 
aaa     1         1
aaa     3         7
aaa     6         11
...

For example, if I consider students and occ are bot arrays then to compute percentile 50th of array students with taking into consideration of occ I would normaly compute like this:

val students = Array(1,7,11)
val occ = Array(1,3,6)

it gives:
val student_repeated = Array(1,7,7,7,11,11,11,11,11,11)

then student_50th would be 50th percentile of student_repeated => 11.

My current code:

import spark.implicits._

val inputDF = Seq(
  ("aaa", 1, 1),
  ("aaa", 3, 7),
  ("aaa", 6, 11),
)
  .toDF("name", "occ", "student")

// Solution 1
inputDF
  .withColumn("student", array_repeat(col("student"), col("occ")))
  .withColumn("student", explode(col("student")))
  .groupBy("name")
  .agg(
    percentile_approx(col("student"), lit(0.5), lit(10000)).alias("student_50"),
    percentile_approx(col("student"), lit(0.15), lit(10000)).alias("student_15"),

  )
  .show(false)

which outputs:

+----+----------+----------+
|name|student_50|student_15|
+----+----------+----------+
|aaa |11        |7         |
+----+----------+----------+

EDIT: I am looking for scala equivalent solution: https://stackoverflow.com/a/58309977/4450090

EDIT2: I am proceeding with sketches-java https://github.com/DataDog/sketches-java



Solution 1:[1]

I have decided to use dds sketch which has method accept which allows the sketch to be updated.

"com.datadoghq" % "sketches-java" % "0.8.2"

First, I initialize empty sketch. Then, I accept pair of values (value, weight) Then after all I call dds sketch method getValueAtQuantile

I do execute all as Spark Scala Aggregator.

class DDSInitAgg(pct: Double, accuracy: Double) extends Aggregator[ValueWithWeigth, SketchData, Double]{
  private val precision: String = "%.6f"

  override def zero: SketchData = DDSUtils.sketchToTuple(DDSketches.unboundedDense(accuracy))

  override def reduce(b: SketchData, a: ValueWithWeigth): SketchData = {
    val s = DDSUtils.sketchFromTuple(b)
    s.accept(a.value, a.weight)
    DDSUtils.sketchToTuple(s)
  }

  override def merge(b1: SketchData, b2: SketchData): SketchData = {
    val s1: DDSketch = DDSUtils.sketchFromTuple(b1)
    val s2: DDSketch = DDSUtils.sketchFromTuple(b2)
    s1.mergeWith(s2)
    DDSUtils.sketchToTuple(s1)
  }

  override def finish(reduction: SketchData): Double = {
    val percentile: Double = DDSUtils.sketchFromTuple(reduction).getValueAtQuantile(pct)
    precision.format(percentile).toDouble
  }

  override def bufferEncoder: Encoder[SketchData] = ExpressionEncoder()
  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble

}

You can execute it as udaf taking two columns as the input. Additionaly, I developed methods for encoding/decoding back and forth from DDSSketch <---> Array[Byte]

case class SketchData(backingArray: Array[Byte], numWrittenBytes: Int)

object DDSUtils {
  val emptySketch: DDSketch = DDSketches.unboundedDense(0.01)

  val supplierStore: Supplier[Store] = () => new UnboundedSizeDenseStore()

  def sketchToTuple(s: DDSketch): SketchData = {
    val o = GrowingByteArrayOutput.withDefaultInitialCapacity()
    s.encode(o, false)
    SketchData(o.backingArray(), o.numWrittenBytes())
  }

  def sketchFromTuple(sketchData: SketchData): DDSketch = {
    val i: ByteArrayInput = ByteArrayInput.wrap(sketchData.backingArray, 0, sketchData.numWrittenBytes)
    DDSketch.decode(i, supplierStore)
  }

}

This is how I call it as udaf

  val ddsInitAgg50UDAF: UserDefinedFunction = udaf(new DDSInitAgg(0.50, 0.50), ExpressionEncoder[ValueWithWeigth])

and finally then in aggregation:

ddsInitAgg50UDAF(col("weigthCol"), col("valueCol")).alias("value_pct_50")

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Dariusz Krynicki