'Count the duration between 2 time in scala ( streaming)

I have a list of events and I need to count the time duration between the 2 events. The previous event can be in another window. How to do it? I use scala and streaming(mini-batches). I wrote the next but I had a mistake :"Non-time-based windows are not supported on streaming DataFrames/Datasets;"

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

 val ttt = tester
  .withWatermark("time", "1 minutes")
  .groupBy(
   window($"time", "1 minutes", "10 seconds"),
   $"nodeId"
      )
.count()
.withColumn("WindowSize",(col("window.end").cast("Long") - 
col("window.start").cast("Long")) / 60)
.withColumn("prev_time" ,lead(col("window.start"), 1)
.over(Window.partitionBy("nodeId").orderBy("window.start")))


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source