'Spring boot kafka consumer read the offset from beginnig on deserialization exception
My Requirement is to catch poison pill message and raise an event to custom Kafka DLQ topic with string key and Avro value and proceed with next good message. The observation is that ,after app restarts , the consumer starts consuming from the beginning. We are using CustomDeadLetterRecoverer to write a Avro event as value. The FixedBackOff is set to 0 .let me know where is the mistake. I tried removing acks:all and retries ,the behavior is same.
Config
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
topic: dlq-deserializer-event
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
transactional.id: tx-xxx-${random.uuid}
enable.idempotence: true
retries: 3
acks: all
properties:
schema.registry.url: http://localhost:8081
#Server host name verification is disabled by setting ssl.endpoint.identification.algorithm to an empty string
ssl.endpoint.identification.algorithm: https
sasl.jaas.enabled: false
sasl.jaas.config: ""
sasl.mechanism: PLAIN
security.protocol: PLAINTEXT
# Delegate deserializers
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific.avro.reader: true
isolation.level: read_committed
ssl:
protocol: SSL
consumer:
group-id: rp-xx
auto-offset-reset: earliest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
enable-auto-commit: false
listener:
ack-mode: MANUAL_IMMEDIATE
Java Config
@Bean
public CommonErrorHandler commonErrorHanlder(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0L, 0L));
return handler;
}
@Bean
public DeadLetterPublishingRecoverer publisher(@Autowired KafkaTemplate<String, AppDebugEvents> template) {
return new CustomDeadLetterPublishingRecoverer(template, (record, ex) -> new TopicPartition(topicName, -1));
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
