'Rewind a kafka consumer group offset to a given timestamp
We want to expose an endpoint that enables rewinding a kafka consumer group offset to a supplied timestamp. I know this functionality is provided in ConsumerSeekCallback.seekToTimestamp.
My issue is trying to get access to ConsumerSeekCallback in the controller to make the seek call.
I had two approaches.
My first approach was to use KafkaListenerEndpointRegistry
In my endpoint/controller I would do something like the following
registry.getAllListenerContainers().forEach(c -> {
ConcurrentMessageListenerContainer<?, ?> c2 = (ConcurrentMessageListenerContainer<?, ?>) c;
c2.getContainers().forEach(kafkaMessageListenerContainer -> {
kafkaMessageListenerContainer
I was hoping I would be able to access
kafkaMessageListenerContainer.listenerConsumer.seekCallback.seekToTimestamp(...);
Not so, listenerConsumer protected - probably for good reason.
My next approach was to expose a method called getConsumerSeekCallbacks. That way I could access this in my controller
@Component
public class KafkaConsumer extends AbstractConsumerSeekAware {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
@KafkaListener(id = "getting-started", topics = "getting-started")
public void receive(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String obj,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp,
ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
LOGGER.info("Received message timestamp: {}, date: {}", timestamp,
Instant.ofEpochMilli(timestamp).atZone(ZoneOffset.UTC).toLocalDateTime());
}
public Map<TopicPartition, ConsumerSeekCallback> getConsumerSeekCallbacks() {
return this.getSeekCallbacks();
}
}
and simplified controller code:
@RestController
public class Controller {
@Autowired
private KafkaTemplate<Object, Object> template;
@Autowired
public KafkaListenerEndpointRegistry registry;
@Autowired
private KafkaConsumer kafkaConsumer;
@PostMapping(path = "/admin/reset/{timestamp}")
public void send(@PathVariable String timestamp) {
Map<TopicPartition, ConsumerSeekAware.ConsumerSeekCallback> seekCallbacks = kafkaConsumer.getConsumerSeekCallbacks();
long offsetTimestamp = System.currentTimeMillis() - 129000 * 1000;
seekCallbacks.forEach((tp, callback) -> {
callback.seekToTimestamp(tp.topic(), tp.partition(), offsetTimestamp);
});
}
}
This works, though not every time and not entirely sure of threadsafety with this approach. Plus it feels very hacky to me.
I'm sure there is a better way?
Solution 1:[1]
What you are doing is correct. Although I would have a List<AbstractConsumerSeekAware> autowired in your Controller instead. And just call their seekToTimestamp(long time). It does that iteration internally for you.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Artem Bilan |
