'Flink - SQL Tumble End on event time not returning any result
I have a Flink job that consumes from a kafka topic and tries to create windows based on few columns like eventId and eventName. Kafka topic has eventTimestamp as the timestamp field with timestamp populated in millis
DataStreamSource kafkaStream = env.fromSource(
kafkaSource, //kafkaSource is the KafkaSource builder
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "KafkaSource");
// Doing some transformations to map to POJO class.
Table kafkaTable = tableEnv.fromDataStream(
kafkaSource,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
// eventTimestamp is in millis
.columnByExpression("event_time", "TO_TIMESTAMP_LTZ(eventTimestamp, 3)")
.watermark("event_time", "event_time - INTERVAL '20' SECOND")
.build();
The Tumble_End window query returns rows when proc_time is used, but doesn't return anything when I use event_time.
SELECT TUMBLE_END(event_time, INTERVAL '1' MINUTE), COUNT(DISTINCT eventId)
FROM kafkaTable GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE)"
-- This query gives some results
SELECT TUMBLE_END(proc_time, INTERVAL '1' MINUTE), COUNT(DISTINCT eventId)
FROM kafkaTable GROUP BY TUMBLE(proc_time, INTERVAL '1' MINUTE)"
I tried to set env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); but this is deprecated as I'm using 1.14.4 stable version.
I tried adding custom WatermarkStrategy as well, but nothing worked. I'm not able to identify this behaviour. Can someone help on this?
David - Here is the code I'm using.
main() {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tableEnv = StreamTableEnvironment.create(env)
val kafkaSource = KafkaSource.builder<String>()
.setBootstrapServers("localhost:9092")
.setTopics("an-topic")
.setGroupId("testGroup")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(SimpleStringSchema())
.build()
val kafkaStream = env.fromSource(kafkaSource,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "KafkaSource")
val kafkaRowMapper = kafkaStream.map(RowMapper())
val finalTable = tableEnv.fromDataStream(kafkaRowMapper,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.columnByExpression("event_time", "TO_TIMESTAMP_LTZ(f2, 3)")
.watermark("event_time", "event_time - INTERVAL '20' SECOND")
.build()
).renameColumns(
`$`("f0").`as`("eventId"),
`$`("f1").`as`("eventName"),
`$`("f3").`as`("eventValue")
)
tableEnv.createTemporaryView("finalTable", finalTable)
val sqlQuery = "SELECT eventId, eventName, TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS event_time_new, " +
"LAST_VALUE(eventValue) AS eventValue FROM finalTable " +
"GROUP BY eventId, eventName, TUMBLE(event_time, INTERVAL '1' MINUTE)"
val resultTable = tableEnv.sqlQuery(sqlQuery)
tableEnv.toDataStream(resultTable).print()
env.execute("TestJob")
}
class RowMapper: MapFunction<String, Tuple4<String, String, Long, Float>> {
override fun map(value: String): Tuple4<String, String, Long, Float> {
val lineArray = value.split(",")
return Tuple4 (lineArray[0], lineArray[1], lineArray[2].toLong(), lineArray[3].toFloat())
}
}
Kafka topic has values like this
event1,Util1,1647614467000,0.12
event1,Util1,1647614527000,0.26
event1,Util1,1647614587000,0.71
event2,Util2,1647614647000,0.08
event2,Util2,1647614707000,0.32
event2,Util2,1647614767000,0.23
event2,Util2,1647614827000,0.85
event1,Util1,1647614887000,0.08
event1,Util1,1647614947000,0.32
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
