'Kafka Consumer - Point to specific offset in Spring boot Kafka

I'm trying to seek the offset to some specific offset. It seems I need to use seekToTimestamp and there is no seekToOffset or something. I've research a lot but couldn't find exact implementation of the code. For now I have to consume previous event which is already in topic that's why I've used:

topicPartitions = {@org.springframework.kafka.annotation.TopicPartition(topic = "some_topic",
            partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))}

But I have no idea how I can make it. Point listener to specific offset and start from there?

I would be grateful if anyone can help me!

@Component
@Slf4j
@Builder
public class KafkaConsumer extends AbstractConsumerSeekAware {

    private final Optional<Long> seekToTimestamp; 

    @KafkaListener(id = "some_id", topicPartitions = {@org.springframework.kafka.annotation.TopicPartition(topic = "some_topic",
            partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))}) 

    public SomeAvroClass listen(SomeAvroClass someAvroClass) throws URISyntaxException, IOException, InterruptedException {
        log.info("Consuming event from Kafka" + someAvroClass);  
        log.info("BEFORE Seek : " + someAvroClass.getCode())); 
        KafkaConsumer.builder().seekToTimestamp(Optional.of(System.currentTimeMillis()- 6_000_000));
        log.info("AFTER Seek : " + someAvroClass.getCode())); //result is same as BEFORE
        return someAvroClass;

    }
}


Solution 1:[1]

Not sure what is KafkaConsumer.builder(), but there indeed an API to seek to arbitrary offset in the KafkaConsumer:

/**
 * Overrides the fetch offsets that the consumer will use on the next {@link #poll(Duration) poll(timeout)}. If this API
 * is invoked for the same partition more than once, the latest offset will be used on the next poll(). Note that
 * you may lose data if this API is arbitrarily used in the middle of consumption, to reset the fetch offsets
 *
 * @throws IllegalArgumentException if the provided offset is negative
 * @throws IllegalStateException if the provided TopicPartition is not assigned to this consumer
 */
@Override
public void seek(TopicPartition partition, long offset) {

The AbstractConsumerSeekAware delegates there via its getSeekCallbackFor(TopicPartition topicPartition) and then:

    /**
     * Perform a seek operation. When called from
     * {@link ConsumerSeekAware#onPartitionsAssigned(Map, ConsumerSeekCallback)} or
     * from {@link ConsumerSeekAware#onIdleContainer(Map, ConsumerSeekCallback)}
     * perform the seek immediately on the consumer. When called from elsewhere,
     * queue the seek operation to the consumer. The queued seek will occur after any
     * pending offset commits. The consumer must be currently assigned the specified
     * partition.
     * @param topic the topic.
     * @param partition the partition.
     * @param offset the offset (absolute).
     */
    void seek(String topic, int partition, long offset);

See docs for more info: https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek

UPDATE

Some possible listener for arbitrary seek:

@Component
public class Listener implements ConsumerSeekAware {

    private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

    @KafkaListener(topics = "some_topic")
    public void listen2(String data,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        
        if (SOME_CONDITION) {
            this.seekCallBack.get().seek(topic, partition, 101);
        }
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.seekCallBack.set(callback);
    }

}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1