'Kafka consumer - how to recognized offset skipping/missing offsets?

Setup:
We have a Debezium/Kafka Connect setup with an Debezium Oracle producer and a Confluend JDBC consumer/sink.

Starting position / background / problem:
Due to high traffic we have decreased the log.retention.minutes to 1h which is suitable in 99% of the time. But in some rare cases one of the kafka consumers gets a slow down and can't keep up any longer. In that case messages will be deleted in Kafka (due to the aforementioned retention period) before they were picked up and handled by the consumer. In the default config, the consumer then will skip the missing records be choosing the earliest available offset. This leads to inconsistencies on the target side.

Question:
How to handle those situations (if raising the log.retention.minutes isn't an option)?
Note: We would be fine, if the consumer would just throw an exception/stop/etc in case it can't find a message for its given offset.

What we've tried to far...
We tried setting auto.offset.reset to none for the consumer and expected the consumer to stop in case it can't find an offset. In theory this should work. In practice it immeadiately throws an exception when the consumer gets instantiated because there's no first/initial offset.

Final thoughts So is there another config parameter we could use? (Something like "throw exception if offset is missing/skipped, but not on first start"?) Or is there a JMX metric we could monitor in case a consumer is skipping messages?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source