'How to emit processing delay metric using Kafka record timestamp in Flink?

We have a use case that we want to emit exact processing delay using Kafka record timestamps. We used kafka lag metrics, but they are too noisy and do not reflect the actual delay.

The idea is to get a record timestamp from Kafka's record, use current processing time, and emit that metric. We are only interested in max delay, so we can probably find the maximum delay and emit it as a gauge type metric.

We have a direct reference to KafkaTableSource as we are using Table API, but I need to inject this metric logic to createKafkaConsumer somehow. Therefore, if I can figure out how to extend FlinkKafkaConsumer to emit this metric, I can achieve my goal.

import org.apache.flink.streaming.connectors.kafka.KafkaTableSource
class ModifiedKafkaTableSource extends KafkaTableSource {
  override protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(...) = {
    // FlinkKafkaConsumer that emits a metric whenever it receives a message from Kafka
    // Kafka timestamp is chronological order so it gives us idea about delay
  }
}

Is it possible to call assignTimestampsAndWatermarks to assign Kafka record timestamps and somehow use current processing time to report.

Note: We are on Flink 1.11



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source