'Spring boot kafka consumer read the offset from beginnig on deserialization exception

My Requirement is to catch poison pill message and raise an event to custom Kafka DLQ topic with string key and Avro value and proceed with next good message. The observation is that ,after app restarts , the consumer starts consuming from the beginning. We are using CustomDeadLetterRecoverer to write a Avro event as value. The FixedBackOff is set to 0 .let me know where is the mistake. I tried removing acks:all and retries ,the behavior is same.

Config

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      topic: dlq-deserializer-event
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        transactional.id: tx-xxx-${random.uuid}
        enable.idempotence: true
        retries: 3
        acks: all
    properties:
      schema.registry.url:   http://localhost:8081 
      #Server host name verification is disabled by setting ssl.endpoint.identification.algorithm to an empty string
      ssl.endpoint.identification.algorithm:  https 
      sasl.jaas.enabled:  false 
      sasl.jaas.config:  "" 
      sasl.mechanism: PLAIN
      security.protocol:  PLAINTEXT 
      # Delegate deserializers
      spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
      spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
      specific.avro.reader: true
      isolation.level: read_committed     
    ssl:
      protocol: SSL
    consumer:
      group-id: rp-xx  
      auto-offset-reset:  earliest 
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      enable-auto-commit: false
    listener:
      ack-mode: MANUAL_IMMEDIATE

Java Config

@Bean
public CommonErrorHandler commonErrorHanlder(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {

    DefaultErrorHandler handler = new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0L, 0L));

    return handler;
}

@Bean
public DeadLetterPublishingRecoverer publisher(@Autowired KafkaTemplate<String, AppDebugEvents> template) {
    return new CustomDeadLetterPublishingRecoverer(template, (record, ex) -> new TopicPartition(topicName, -1));
}


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source