'Spark Window function using more than one column

I have this dataframe that shows the send time and the open time for each user:

val df = Seq(("user1", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
             ("user1", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
             ("user1", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
             ("user1", "2018-04-05 18:00:00", "2018-04-05 18:50:00"),
             ("user2", "2018-04-05 15:00:00", "2018-04-05 15:50:00"),
             ("user2", "2018-04-05 16:00:00", "2018-04-05 16:50:00"),
             ("user2", "2018-04-05 17:00:00", "2018-04-05 17:50:00"),
             ("user2", "2018-04-05 17:30:00", "2018-04-05 17:40:00"),             
             ("user2", "2018-04-05 18:00:00", null),
             ("user2", "2018-04-05 19:00:00", null)              
            ).toDF("id", "sendTime", "openTime")

+-----+-------------------+-------------------+
|   id|           sendTime|           openTime|
+-----+-------------------+-------------------+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|
|user2|2018-04-05 18:00:00|               null|
|user2|2018-04-05 19:00:00|               null|
+-----+-------------------+-------------------+

Now I want to count the number of opens that have happened in the past two hours from each send time for each user. I used window function to partition by user, but I couldn't figure out how to compare values from the sendTime column to the openTime column. The result dataframe should look like this:

+-----+-------------------+-------------------+-----+
|   id|           sendTime|           openTime|count|
+-----+-------------------+-------------------+-----+
|user1|2018-04-05 15:00:00|2018-04-05 15:50:00|    0|
|user1|2018-04-05 16:00:00|2018-04-05 16:50:00|    1|
|user1|2018-04-05 17:00:00|2018-04-05 17:50:00|    2|
|user1|2018-04-05 18:00:00|2018-04-05 18:50:00|    2|
|user2|2018-04-05 15:00:00|2018-04-05 15:50:00|    0|
|user2|2018-04-05 16:00:00|2018-04-05 16:50:00|    1|
|user2|2018-04-05 17:00:00|2018-04-05 17:50:00|    2|
|user2|2018-04-05 17:30:00|2018-04-05 17:40:00|    2|
|user2|2018-04-05 18:00:00|               null|    3|
|user2|2018-04-05 19:00:00|               null|    2|
+-----+-------------------+-------------------+-----+

This is as far as I have got but doesn't give me what I need:

var df2 = df.withColumn("sendUnix", F.unix_timestamp($"sendTime")).withColumn("openUnix", F.unix_timestamp($"openTime"))
val w = Window.partitionBy($"id").orderBy($"sendUnix").rangeBetween(-2*60*60, 0)
df2 = df2.withColumn("count", F.count($"openUnix").over(w))


Solution 1:[1]

Here you go.

val df1 = df.withColumn("sendTimeStamp", unix_timestamp(col("sendTime"))).withColumn("openTimeStamp", unix_timestamp(col("openTime")))
    
val w = Window.partitionBy('id).orderBy('sendTimeStamp).rangeBetween(-7200, 0)

var df2 = df1.withColumn("list", collect_list('openTimeStamp).over(w))

var df3 = df2.select('*, explode('list).as("prevTimeStamp"))

df3.groupBy('id, 'sendTime).agg(max('openTime).as("openTime"), sum(when(col("sendTimeStamp").minus(col("prevTimeStamp")).between(0, 7200), 1).otherwise(0)).as("count")).show

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 General Grievance