'Kafka Consumer - Point to specific offset in Spring boot Kafka
I'm trying to seek the offset to some specific offset. It seems I need to use seekToTimestamp and there is no seekToOffset or something. I've research a lot but couldn't find exact implementation of the code. For now I have to consume previous event which is already in topic that's why I've used:
topicPartitions = {@org.springframework.kafka.annotation.TopicPartition(topic = "some_topic",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))}
But I have no idea how I can make it. Point listener to specific offset and start from there?
I would be grateful if anyone can help me!
@Component
@Slf4j
@Builder
public class KafkaConsumer extends AbstractConsumerSeekAware {
private final Optional<Long> seekToTimestamp;
@KafkaListener(id = "some_id", topicPartitions = {@org.springframework.kafka.annotation.TopicPartition(topic = "some_topic",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
public SomeAvroClass listen(SomeAvroClass someAvroClass) throws URISyntaxException, IOException, InterruptedException {
log.info("Consuming event from Kafka" + someAvroClass);
log.info("BEFORE Seek : " + someAvroClass.getCode()));
KafkaConsumer.builder().seekToTimestamp(Optional.of(System.currentTimeMillis()- 6_000_000));
log.info("AFTER Seek : " + someAvroClass.getCode())); //result is same as BEFORE
return someAvroClass;
}
}
Solution 1:[1]
Not sure what is KafkaConsumer.builder(), but there indeed an API to seek to arbitrary offset in the KafkaConsumer:
/**
* Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
* is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
* you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
*
* @throws IllegalArgumentException if the provided offset is negative
* @throws IllegalStateException if the provided TopicPartition is not assigned to this consumer
*/
@Override
public void seek(TopicPartition partition, long offset) {
The AbstractConsumerSeekAware delegates there via its getSeekCallbackFor(TopicPartition topicPartition) and then:
/**
* Perform a seek operation. When called from
* {@link ConsumerSeekAware#onPartitionsAssigned(Map, ConsumerSeekCallback)} or
* from {@link ConsumerSeekAware#onIdleContainer(Map, ConsumerSeekCallback)}
* perform the seek immediately on the consumer. When called from elsewhere,
* queue the seek operation to the consumer. The queued seek will occur after any
* pending offset commits. The consumer must be currently assigned the specified
* partition.
* @param topic the topic.
* @param partition the partition.
* @param offset the offset (absolute).
*/
void seek(String topic, int partition, long offset);
See docs for more info: https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek
UPDATE
Some possible listener for arbitrary seek:
@Component
public class Listener implements ConsumerSeekAware {
private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
@KafkaListener(topics = "some_topic")
public void listen2(String data,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
if (SOME_CONDITION) {
this.seekCallBack.get().seek(topic, partition, 101);
}
}
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.seekCallBack.set(callback);
}
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
