'Count multiple aggregates in a sliding window in Spark Structured Streaming

I have a streaming source which sends events where every record consiste of 3 fields (CreationTime, FP, Detected) Here, 'FP' stands for false positive. 'FP' and 'Detected' fields can have values 1 or 0. I want to calculate the following values over a sliding window. FPR1 = Count(FP) / Count(Detected) and FPR2 = Count(FP) / Count(Total records in window)

I am able to aggregate Count(FP) using following query. I want to count the other 2 aggregates as well. ie DetectedCount and TotalCount and calculate FPR1 and FPR2 and write to a file sink. How do I do this? Thanks in advance.

val aggDF = finaldata
  .withWatermark("CreatedTime", "2 minute")
  .groupBy(col("FP"),
    window(col("CreatedTime"), "5 minute", "1 minute"))
  .agg(sum("FP").alias("FPCount"))


Solution 1:[1]

Figured it out finally. I was using groupby wrongly. here is the final query.

    val aggDF = finaldata
  .withWatermark("CreatedTime", "2 minute")
  .groupBy(window(col("CreatedTime"), "5 minute", "1 minute"))
.agg(sum("FP").alias("FPCount"),sum("Detected").alias("DetectedCount"),sum("Count").alias("TotalCount"))
  .withColumn("FPR", col("FPCount")/col("DetectedCount"))
  .withColumn("FPR2", col("DetectedCount")/col("TotalCount"))

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Arun Satyarth