'Kafka messages from topic are being replayed after consumer restart

I am facing a strange a problem in kafka that all kafka messages from topic are being replayed after consumer application restart. Can anyone help me what am I doing wrong here ?

Here is my configuration:

spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.enable.auto.commit= false

My Producer configuration:

        producerconfigs.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
        producerconfigs.put(ACKS_CONFIG, "all");
        producerconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "client.id");
        producerconfigs.put(RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        producerconfigs.put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        producerconfigs.put(TRANSACTIONAL_ID_CONFIG, "V1-"+ UUID.randomUUID().toString());

Consumer Configuration :

        consumerconfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        consumerconfigs.put(SESSION_TIMEOUT_MS_CONFIG, "10000");
        consumerconfigs.put("isolation.level", "read_committed");

Consumer code :

@KafkaListener(topics = "TOPIC-1", groupId = "TOPIC-GRP", containerFactory = "kaListenerContainerFactory",concurrency = "10", autoStartup = "true")
public String processMesage(@Payload String message,@Header(value = KafkaHeaders.CORRELATION_ID, required = false) String correlationId,@Header(value = KafkaHeaders.OFFSET, required = false) String offset) throws JsonProcessingException {//business logic goes here }

Container Code

     @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
                kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
        ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryString());
        factory.setBatchListener(true);
        return factory;
    }

consumer config

            Map<String, Object> getConsumerProperties() {
         Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          environment.getProperty("spring.kafka.consumer.bootstrap-servers"));
        config.put(ConsumerConfig.GROUP_ID_CONFIG, 
       environment.getProperty("spring.kafka.consumer.group-id"));
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
        environment.getProperty("spring.kafka.consumer.auto-offset-reset"));
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
       environment.getProperty("spring.kafka.enable.auto.commit"));
        config.put(KEY_DESERIALIZER_CLASS_CONFIG, 
      environment.getProperty("spring.kafka.consumer.key-deserializer"));
        config.put(VALUE_DESERIALIZER_CLASS_CONFIG,  
      environment.getProperty("spring.kafka.consumer.value-deserializer"));
        config.put("isolation.level", "read_committed");
        return config;
    }

application.properties

spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.consumer.group-id=consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.enable.auto.commit= false


Solution 1:[1]

I am not sure if someone has answered the question, but it seems config is causing the issue.

Auto commit false mean kafka topic will never have information of offset traversed by consumer group and auto-offset-reset earliest mean always read from beginning.

spring.kafka.enable.auto.commit= false
spring.kafka.consumer.auto-offset-reset=earliest

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Shashi Shankar