'@RetryableTopic showing weird behaviour when using with topicPartitions to reset offset - spring kafka

I am trying to use @RetryableTopic for unblocking retries and topicPartitions in order to read messages from beginning.

Below is my listener (I have only one partition):

@Slf4j
@Component
public class SingleTopicRetryConsumer {

    @RetryableTopic(
            attempts = "4",
            backoff = @Backoff(delay = 1000),
            fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
    @KafkaListener(topicPartitions = {@TopicPartition(topic = "products",
            partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
    public void listen(ConsumerRecord<String, String> message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        log.info("message consumed - \nkey: {} , \nvalue: {}, \ntopic: {}, \nat: {}",
                message.key(),
                message.value(),
                message.topic(),
                LocalDateTime.now());
    }

    @DltHandler
    public void dltListener(ConsumerRecord<String, String> message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("message consumed at DLT - \nkey: {} , \nvalue: {}, \ntopic: {}, \nat: {}",
                message.key(),
                message.value(),
                message.topic(),
                LocalDateTime.now());
    }
}

Config properties:

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: group_id
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

The above code starts to emit weird behaviour, it reads same message twice from main listener and once from DLT but from main topic only.

logs:

15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#8-0-C-1] INFO  c.m.s.c.n.SingleTopicRetryConsumer 
- message consumed - 
key: product1 , 
value: This is Product1, 
topic: products, 
at: 2022-04-07T15:10:50.950810          

15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#9-retry-0-C-1] INFO  c.m.s.c.n.SingleTopicRetryConsumer - 
message consumed - 
key: product1 , 
value: This is Product1, 
topic: products, 
at: 2022-04-07T15:10:50.950810      

15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#10-dlt-0-C-1] INFO  c.m.s.c.n.SingleTopicRetryConsumer - 
message consumed at DLT - 
key: product1 , 
value: This is Product1, 
topic: products, 
at: 2022-04-07T15:10:50.950810  

If I use above code without topicPartitions by removing below line, listener works as expected.

            partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))}

Any clues to why it might be happening ?



Solution 1:[1]

UPDATE: This bug has been fixed in Spring for Apache Kafka 2.8.5.

That's a bug. The problem is we set the retry topic name to the topics property of the endpoint, instead of setting it to the topicPartition. So we end up with two listeners for the main endpoint and none for the retry topic.

If you can please open an issue: https://github.com/spring-projects/spring-kafka/issues

Not sure there's a workaround for this using topic partitions - it should be fixed in the 2.8.5 release in a couple of weeks.

Thanks for reporting.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1