'AggregateFunction with SessionWindow - understand how merge works
While implementing the AggregateFunction in Flink with EventTimeSessionWindows, I am not able to understand when the merge happens in case of a SessionWindow with dynamic gap.
Code snippet:
SingleOutputStreamOperator<Tuple1<String>> aggregateData = parsedData.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMinutes(20)))
.keyBy(new ZeusRawKeyByFunction())
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<ZeusEvent>() {
@Override
public long extract(ZeusEvent event) {
if (event.getEventTypeName().equals("PlaybackSessionClosed")) {
return 100;
} else {
return Time.minutes(30).toMilliseconds();
}
}
}))
.allowedLateness(Time.minutes(10))
.trigger(ContinuousEventTimeTrigger.of(Time.minutes(1)))
.sideOutputLateData(lateEvents)
.aggregate(new ZeusAggregateFunction())
.setParallelism(parameterTool.getInt("zeus-aggregator-parallelism"))
.name("Zeus Aggregator")
I have defined four functions in the aggregator:
createAccumulator: This creates a new accumulatoradd: This will keep on adding all the new events in the 1 min trigger time to the accumulatorgetResult: This will get the final row to write to sink for that triggermerge: When does this work ? Does the merge happen for every trigger?
I am trying to understand if the merge will happen every min with the trigger and a new accumulator gets created and gets merged with the previous one.
Solution 1:[1]
Consider an event e with timestamp e.t and a dynamic gap computed as gap(e).
As each event e arrives at the window operator, it is initially assigned to a new session window extending from e.t to e.t + gap(e). Then the window operator iterates over all of the sessions (independently for each key), and whenever two sessions overlap (in time), they are merged to form a new, longer session covering the union of the timespans of both sessions. This continues until no further merging is possible.
As each merge occurs, the onMerge method of the trigger is called, as well as the merge method of the accumulator.
Then the onElement method of the trigger is called, passing in the event e. This will ensure that the appropriate timer is in place so that when the watermark passes the end of the session window (which includes the appropriate gap) the window will FIRE.
Thus the merging is done as each event is processed, and is not coupled to the timing of the continuous/periodic trigger.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | David Anderson |
