'Flink 1.14 TumblingEventTimeWindows getting strange period of time
I'm experimenting an strange behaviour on flink windows.
I'm reading events from kafka, doing a mapping to a common class LogKPI (having: message, count and timestamp as fields) and then I want to aggregate mapped events having same message in windows of 1 minute, allowing late events of 20 seconds.
Flink is aggregating properly the events having the same message, and the field count is incremented as it should. The problem is that the window that Flink is using is not of 1 minute, it seems that the window (+late events) that is using is one of more than 13 hours... I don't know exactly the amount of time it is using.
I don't know why Flink is not taking the right amount of time.
This is what I expected to have:
"timestamp":"08:03:51.092" --> creates new window1, count = 1
"timestamp":"08:03:51.094" --> add event to window1, count = 2
"timestamp":"08:03:51.096" --> add event to window1, count = 3
"timestamp":"08:03:51.099" --> add event to window1, count = 4
"timestamp":"08:05:51.103" --> New window2 count = 1; triggers get result from aggregator->Window1 count= 5
"timestamp":"08:03:52.099" --> Late event, discarded "timestamp":"22:03:51.103" --> New window 3 count =1; triggers get result from aggregator->Window2 count=1
This is what I have:
"timestamp":"08:03:51.092" --> creates new window1, count = 1
"timestamp":"08:03:51.094" --> add event to window1, count = 2
"timestamp":"08:03:51.096" --> add event to window1, count = 3
"timestamp":"08:03:51.099" --> add event to window1, count = 4
"timestamp":"08:05:51.103" --> does not trigger get result from aggregator. New window2 count = 1
"timestamp":"08:03:52.099" --> add event to window1, count = 5
"timestamp":"22:03:51.103" --> triggers get result from aggregator --> window1 count= 5, window2 count=1
This is the code I have:
Watermark:
WatermarkStrategy<LogKPI> watermarkStrategy = WatermarkStrategy
.<LogKPI>forBoundedOutOfOrderness(Duration.ofSeconds(60))
.withTimestampAssigner((logKpi, timestamp) -> logKpi.getTimestamp());
Window aggregation:
SingleOutputStreamOperator<String> logKPIs = logsMapped.assignTimestampsAndWatermarks(watermarkStrategy)
.keyBy(LogKPI::getMessage)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new GenericLogAggregateFunction());
Aggregator:
@Override
public LogKPI createAccumulator() {
LogKPI logKPIsAccumulator = new LogKPI();
return logKPIsAccumulator;
}
@Override
public LogKPI add(LogKPI inputMetrics, LogKPI accumulator) {
accumulator.setMessage(inputMetrics.getMessage());
//set timestamp if it is greater than the accumulated
if (inputMetrics.getTimestamp() > accumulator.getTimestamp())
accumulator.setTimestamp(inputMetrics.getTimestamp());
accumulator.setCount( accumulator.getCount() + inputMetrics.getCount());
return accumulator;
}
@Override
public String getResult(LogKPI accumulator) {
return accumulator.toString();
}
@Override
public LogKPI merge(LogKPI acc1, LogKPI acc2) {
acc1.setCount( acc1.getCount() + acc2.getCount());
//get oldest timestamp
if (acc2.getTimestamp() > acc1.getTimestamp())
acc1.setTimestamp(acc2.getTimestamp());
return acc1;
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
