'Combining blocking and non-blocking retries in Spring Kafka

I am trying to implement non blocking rerties with single topic fixed back-off.

I am able to do so, thanks to documentation https://docs.spring.io/spring-kafka/reference/html/#single-topic-fixed-delay-retries.

Now I also need to perform a few blocked/local retries on main topic. I have been trying to implement this using DefaultErrorHandler as below:

@Bean
public DefaultErrorHandler retryErrorHandler() {
        return new DefaultErrorHandler(new FixedBackOff(2000, 3));
}

This does not seem to work with RetryableTopic.

I have also tried the following approach retry-topic-combine-blocking https://docs.spring.io/spring-kafka/reference/html/#retry-topic-combine-blocking using ListenerContainerFactoryConfigurer but issue I am facing here is creating beans KafkaConsumerBackoffManager, DeadLetterPublishingRecovererFactory and especially KafkaConsumerBackoffManager.

I need to know if this another way to achieve this using spring kafka framework or is there a way to construct above beans ?



Solution 1:[1]

We're currently working on improving configuration for the non-blocking retries components.

For now, as documented here, you should inject these beans such as:

@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
                                               DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
                                               @Qualifier(RetryTopicInternalBeanNames
                                                       .INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
    ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
    lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
    lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
    return lcfc;
}}

Also, there's a known issue where if you try to inject the beans before the first @KafkaListener bean with retryable topic is processed, the feature's component's beans won't be present in the context yet and will throw an error.

Does that happen to you?

We're currently working on a fix for this, but we should be able to work around that if that's your problem.

EDIT: Since the problem is that components are not instantiated yet, the most guaranteed workaround is to provide the components yourself.

Here's a sample on how to do that. Of course, adjust it accordingly if you need any further customization.

    @Configuration
    public static class SO71705876Configuration {

        @Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
        public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
                DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory) {
            ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, Clock.systemUTC());
            lcfc.setBlockingRetryableExceptions(IllegalArgumentException.class, IllegalStateException.class);
            lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
            return lcfc;
        }

        @Bean(name = RetryTopicInternalBeanNames.KAFKA_CONSUMER_BACKOFF_MANAGER)
        public KafkaConsumerBackoffManager backOffManager(ApplicationContext context) {
            PartitionPausingBackOffManagerFactory managerFactory =
                    new PartitionPausingBackOffManagerFactory();
            managerFactory.setApplicationContext(context);
            return managerFactory.create();
        }

        @Bean(name = RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
        public DeadLetterPublishingRecovererFactory dlprFactory(DestinationTopicResolver resolver) {
            return new DeadLetterPublishingRecovererFactory(resolver);
        }

        @Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
        public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
            return new DefaultDestinationTopicResolver(Clock.systemUTC(), context);
        }

In the next release this should not be a problem anymore. Please let me know if that works for you, or if any further adjustment to this workaround is necessary.

Thanks.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1