'Table API's Over aggregation delays firing the trigger
Below is the code snippet:
// Firstly the inputStream receives the first batch of 420 records;
// Secondly the inputStream receives the second batch of 420 records.
Table inputTable = tableEnv.fromDataStream(
inputStream,
Schema.newBuilder()
.columnByExpression("rowtime", "CAST(eventTime AS TIMESTAMP(3))")
.watermark("rowtime", "rowtime")
.build());
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery(
"SELECT " +
" userId," +
" eventTime," +
" AVG(valueA) OVER w1 AS AVG_VALUE" +
" FROM InputTable " +
" WINDOW w1 AS (" +
" PARTITION BY userId ORDER BY rowtime" +
" RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)"
);
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
resultStream.map((Row row) -> {
LOG.warn("Print row: " + row.toString());
// In response to the inputSteam's first batch of 420 records the resultStream only receives 69 records;
// In response to the inputSteam's second batch of 420 records it receives additional 324 records.
return row;
})
The issue looks like below:
- Firstly the inputStream receives the first batch of 420 records - the interval between the records is 1 second;
- In response to the inputSteam's first batch of 420 records the resultStream only receives 69 records;
- Secondly the inputStream receives the second batch of 420 records;
- In response to the inputSteam's second batch of 420 records the resultStream receives additional 324 records.
Need a solution to make the resultStream to retrieve query result in real-time. I suppose that under the hood there is a trigger which fires the trigger for the SQL execution. Any idea how to customize/control it? Or what is the algorithm of it?
Thanks
Solution 1:[1]
Your OVER window will be triggered for a particular row when these conditions are met:
- there's an earlier row for that same userId that's at least 10 seconds older
- the watermark has advanced up to the rowtime of that row (which will require seeing at least one row for rowtime+1)
All pending windows will be triggered at the end of the run if you execute it in batch mode, or with bounded streams.
Update:
With out-of-sequence records, you need accurate watermarking. As it is, out-of-sequence records are being dropped because they are late.
For example, if the records can be up to 15 seconds out of order, then specify the watermarking as
WATERMARK FOR rowtime AS rowtime – INTERVAL ‘15’ SECOND
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
