'Group by date range overlapping using PySpark

I am trying to perform a kind of grouping with users who have used the same IP in a overlapping range of dates.

This will let me know if two users share the same house because they have the same IP at the same time.

Also, I've been trying to implement it, but I can't find a way to do it with PySpark SQL. In fact, I think it can't be done with PySpark, and probably requires some other graph-oriented library.

The problem is the following:

| ip          | user       | start_date | end_date   |
| ----------- | ---------- | ---------- | ---------- |
| 192.168.1.1 | a          | 2022-01-01 | 2022-01-03 |
| 192.168.1.1 | a          | 2022-01-05 | 2022-01-07 |
| 192.168.1.1 | b          | 2022-01-06 | 2022-01-09 |
| 192.168.1.1 | c          | 2022-01-08 | 2022-01-11 |
| 192.168.1.2 | d          | 2022-01-08 | 2022-01-11 |
| 192.168.1.2 | e          | 2022-01-10 | 2022-01-11 |
| 192.168.1.2 | f          | 2022-01-16 | 2022-01-18 |

As we can see:

  • the users a, b overlaps in range and same ip.
  • the users b, c overlaps in range and same ip.
  • indirectly the users a and c are in the same group.
  • the users d, e overlaps in range and same ip.
  • the user f not overlap with respect other user.

Expected output:

| ip          | users       | date_ranges
| ----------- | ----------- | ------------------- | ------------------- |
| 192.168.1.1 | {a, b, c}   | {2022-01-01 - 2022-01-03, 2022-01-05 - 2022-01-07, 2022-01-06 - 2022-01-09, 2022-01-08 - 2022-01-11} |
| 192.168.1.2 | {d, e}      | {2022-01-08 - 2022-01-11, 2022-01-10-2022-01-11} |
| 192.168.1.1 | {f}         | {2022-01-16 - 2022-01-18} |

Do you have any ideas on how to implement this?

I thought about using GraphFrames, but I don't even know where to start :S



Solution 1:[1]

One way to identify overlapping date intervals it to use a cumulative conditional sum over a window partitioned by ip and ordered by start_date. For each row in a frame, if the start_date is greater than max(end_date) before the current row then it doesn't overlaps (i.e. it's a new group):

from pyspark.sql import functions as F, Window

w = Window.partitionBy('ip').orderBy('start_date')

df1 = df.withColumn(
    "previous_end", F.max("end_date").over(w)
).withColumn(
    "group",
    F.sum(F.when(F.lag("previous_end").over(w) < F.col("start_date"), 1).otherwise(0)).over(w)
).groupBy("ip", "group").agg(
    F.collect_list(
        F.struct("user", F.struct("start_date", "end_date").alias("date_ranges"))
    ).alias("sessions")
).select(
    "ip", "sessions.user", "sessions.date_ranges"
)

df1.show(truncate=False)
#+-----------+---------+------------------------------------------------------------------------------+
#|ip         |user     |date_ranges                                                                   |
#+-----------+---------+------------------------------------------------------------------------------+
#|192.168.1.1|[a]      |[{2022-01-01, 2022-01-03}]                                                    |
#|192.168.1.1|[a, b, c]|[{2022-01-05, 2022-01-07}, {2022-01-06, 2022-01-09}, {2022-01-08, 2022-01-11}]|
#|192.168.1.2|[d, e]   |[{2022-01-08, 2022-01-11}, {2022-01-10, 2022-01-11}]                          |
#|192.168.1.2|[f]      |[{2022-01-16, 2022-01-18}]                                                    |
#+-----------+---------+------------------------------------------------------------------------------+

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 blackbishop