'How to read Kafka record ingestion timestamp in Apache Beam

I am new to Apache Beam and struggling with this problem for a while. I am using KafkaIO as a source of my pipeline in Apache Beam Java . I want to fetch Kafka record ingestion timestamp along with every record and write that as an additional column to my output. The timestamp at which the record was ingested in Kafka and not the event time.

I am not able to figure out how to use kafkaIOReader without using the function withoutMetadata() . As far as I understand the Kafka record ingestion timestamp should be part of the metadata for each record ?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source