'Kafka messages from topic are being replayed after consumer restart
I am facing a strange a problem in kafka that all kafka messages from topic are being replayed after consumer application restart. Can anyone help me what am I doing wrong here ?
Here is my configuration:
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.enable.auto.commit= false
My Producer configuration:
producerconfigs.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
producerconfigs.put(ACKS_CONFIG, "all");
producerconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "client.id");
producerconfigs.put(RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
producerconfigs.put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
producerconfigs.put(TRANSACTIONAL_ID_CONFIG, "V1-"+ UUID.randomUUID().toString());
Consumer Configuration :
consumerconfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerconfigs.put(SESSION_TIMEOUT_MS_CONFIG, "10000");
consumerconfigs.put("isolation.level", "read_committed");
Consumer code :
@KafkaListener(topics = "TOPIC-1", groupId = "TOPIC-GRP", containerFactory = "kaListenerContainerFactory",concurrency = "10", autoStartup = "true")
public String processMesage(@Payload String message,@Header(value = KafkaHeaders.CORRELATION_ID, required = false) String correlationId,@Header(value = KafkaHeaders.OFFSET, required = false) String offset) throws JsonProcessingException {//business logic goes here }
Container Code
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryString());
factory.setBatchListener(true);
return factory;
}
consumer config
Map<String, Object> getConsumerProperties() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
environment.getProperty("spring.kafka.consumer.bootstrap-servers"));
config.put(ConsumerConfig.GROUP_ID_CONFIG,
environment.getProperty("spring.kafka.consumer.group-id"));
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
environment.getProperty("spring.kafka.consumer.auto-offset-reset"));
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
environment.getProperty("spring.kafka.enable.auto.commit"));
config.put(KEY_DESERIALIZER_CLASS_CONFIG,
environment.getProperty("spring.kafka.consumer.key-deserializer"));
config.put(VALUE_DESERIALIZER_CLASS_CONFIG,
environment.getProperty("spring.kafka.consumer.value-deserializer"));
config.put("isolation.level", "read_committed");
return config;
}
application.properties
spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.consumer.group-id=consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.enable.auto.commit= false
Solution 1:[1]
I am not sure if someone has answered the question, but it seems config is causing the issue.
Auto commit false mean kafka topic will never have information of offset traversed by consumer group and auto-offset-reset earliest mean always read from beginning.
spring.kafka.enable.auto.commit= false
spring.kafka.consumer.auto-offset-reset=earliest
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Shashi Shankar |
