'Rewind a kafka consumer group offset to a given timestamp

We want to expose an endpoint that enables rewinding a kafka consumer group offset to a supplied timestamp. I know this functionality is provided in ConsumerSeekCallback.seekToTimestamp.

My issue is trying to get access to ConsumerSeekCallback in the controller to make the seek call.

I had two approaches.

My first approach was to use KafkaListenerEndpointRegistry

In my endpoint/controller I would do something like the following

        registry.getAllListenerContainers().forEach(c -> {
            ConcurrentMessageListenerContainer<?, ?> c2 = (ConcurrentMessageListenerContainer<?, ?>) c;
            c2.getContainers().forEach(kafkaMessageListenerContainer -> {
                kafkaMessageListenerContainer

I was hoping I would be able to access kafkaMessageListenerContainer.listenerConsumer.seekCallback.seekToTimestamp(...);

Not so, listenerConsumer protected - probably for good reason.

My next approach was to expose a method called getConsumerSeekCallbacks. That way I could access this in my controller

@Component
public class KafkaConsumer extends AbstractConsumerSeekAware {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private CountDownLatch latch = new CountDownLatch(1);
    private String payload = null;


    @KafkaListener(id = "getting-started", topics = "getting-started")
    public void receive(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String obj,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp,
                        ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("received payload='{}'", consumerRecord.toString());
        LOGGER.info("Received message timestamp: {}, date: {}", timestamp,
                Instant.ofEpochMilli(timestamp).atZone(ZoneOffset.UTC).toLocalDateTime());

    }

    public Map<TopicPartition, ConsumerSeekCallback> getConsumerSeekCallbacks() {
        return this.getSeekCallbacks();
    }

}

and simplified controller code:

@RestController
public class Controller {

    @Autowired
    private KafkaTemplate<Object, Object> template;

    @Autowired
    public KafkaListenerEndpointRegistry registry;

    @Autowired
    private KafkaConsumer kafkaConsumer;


    @PostMapping(path = "/admin/reset/{timestamp}")
    public void send(@PathVariable String timestamp) {

        Map<TopicPartition, ConsumerSeekAware.ConsumerSeekCallback> seekCallbacks = kafkaConsumer.getConsumerSeekCallbacks();
        long offsetTimestamp = System.currentTimeMillis() - 129000 * 1000;
        seekCallbacks.forEach((tp, callback) -> {
            callback.seekToTimestamp(tp.topic(), tp.partition(), offsetTimestamp);
        });
    }
}

This works, though not every time and not entirely sure of threadsafety with this approach. Plus it feels very hacky to me.

I'm sure there is a better way?



Solution 1:[1]

What you are doing is correct. Although I would have a List<AbstractConsumerSeekAware> autowired in your Controller instead. And just call their seekToTimestamp(long time). It does that iteration internally for you.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Artem Bilan