'How to ensure outer NULL join results output in spark streaming if the future events are delayed

In a scenario of Spark stream-stream outer join:

val left = spark.readStream.format("delta").load("...")
.withWatermark("enqueuedTime", "1 hour")

val right = spark.readStream.format("delta").load("...")
.withWatermark("enqueuedTime", "1 hour")


val res = left.as("left").join(right.as("right"),
                               expr("left.key = right.key AND (left.enqueuedTime BETWEEN right.enqueuedTime - INTERVAL 1 hour AND right.enqueuedTime  + INTERVAL 1 hour)"), 
                               "left_outer")

res.writeStream(....)

And a data in left and right streams:

enter image description here

enter image description here

How to ensure a record:

2, left_value1, 2022-04-18T12:39:49.370+0000, NULL, NULL, NULL

is outputted after a given period of time even if new events aren't flowing thought the stream?

I'm only able to get it if new events arrive to both tables, like:

INSERT into left_df  VALUES ("004", "left_df_value", current_timestamp() + INTERVAL 5 hours);
INSERT into right_df VALUES ("004", "right_df_value", current_timestamp() + INTERVAL 5 hours);

Using which, Spark updates the watermarks and understands that now it's safe to output a nullable record. But how to still output it after some kind of timeout, without the new records arriving to both streams?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source