'Exponential backoff with message order guarantee using spring-kafka

I'm trying to implement a Spring Boot-based Kafka consumer that has some very strong message delivery guarentees, even in a case of an error.

  • messages from a partition must be processed in order,
  • if message processing fails, the consumption of the particular partition should be suspended,
  • the processing should be retried with a backoff, until it succeeds.

Our current implementation fulfills these requirements:

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  factory.setRetryTemplate(retryTemplate());

  final ContainerProperties containerProperties = factory.getContainerProperties();
  containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
  containerProperties.setErrorHandler(errorHandler());

  return factory;
}

@Bean
public RetryTemplate retryTemplate() {

  final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
  backOffPolicy.setInitialInterval(1000);
  backOffPolicy.setMultiplier(1.5);

  final RetryTemplate template = new RetryTemplate();
  template.setRetryPolicy(new AlwaysRetryPolicy());    
  template.setBackOffPolicy(backOffPolicy);

  return template;
}

@Bean
public ErrorHandler errorHandler() {
  return new SeekToCurrentErrorHandler();
}

However, here, the record is locked by the consumer forever. At some point, the processing time will exceed max.poll.interval.ms and the server will reassign the partition to some other consumer, thus creating a duplicate.

Assuming max.poll.interval.ms equal to 5 mins (default) and the failure lasting 30 mins, this will cause the message to be processed ca. 6 times.

Another possiblity is to return the messages to the queue after N retries (e.g. 3 attempts), by using SimpleRetryPolicy. Then, the message will be replayed (thanks to SeekToCurrentErrorHandler) and the processing will start from scratch, again up to 5 attempts. This results in delays forming a series e.g.

10 secs -> 30 secs -> 90 secs -> 10 secs -> 30 secs -> 90 secs -> ...

which is less desired than an constantly rising one :)

Is there any third scenario which could keep the delays forming an ascending series and, at the same time, not creating duplicates in the aforementioned example?



Solution 1:[1]

It can be done with stateful retry - in which case the exception is thrown after each retry, but state is maintained in the retry state object, so the next delivery of that message will use the next delay etc.

This requires something in the message (e.g. a header) to uniquely identify each message. Fortunately, with Kafka, the topic, partition and offset provide that unique key for the state.

However, currently, the RetryingMessageListenerAdapter does not support stateful retry.

You could disable retry in the listener container factory and use a stateful RetryTemplate in your listener, using one of the execute methods that taks a RetryState argument.

Feel free to add a GitHub issue for the framework to support stateful retry; contributions are welcome! - pull request issued.

EDIT

I just wrote a test case to demonstrate using stateful recovery with a @KafkaListener...

/*
 * Copyright 2018 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.kafka.annotation;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.RetryState;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.DefaultRetryState;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author Gary Russell
 * @since 5.0
 *
 */
@RunWith(SpringRunner.class)
@DirtiesContext
public class StatefulRetryTests {

    private static final String DEFAULT_TEST_GROUP_ID = "statefulRetry";

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, "sr1");

    @Autowired
    private Config config;

    @Autowired
    private KafkaTemplate<Integer, String> template;

    @Test
    public void testStatefulRetry() throws Exception {
        this.template.send("sr1", "foo");
        assertThat(this.config.listener1().latch1.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(this.config.listener1().latch2.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(this.config.listener1().result).isTrue();
    }

    @Configuration
    @EnableKafka
    public static class Config {

        @Bean
        public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
            return factory;
        }

        @Bean
        public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }

        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> consumerProps =
                    KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka);
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            return consumerProps;
        }

        @Bean
        public KafkaTemplate<Integer, String> template() {
            KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
            return kafkaTemplate;
        }

        @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }

        @Bean
        public Map<String, Object> producerConfigs() {
            return KafkaTestUtils.producerProps(embeddedKafka);
        }

        @Bean
        public Listener listener1() {
            return new Listener();
        }

    }

    public static class Listener {

        private static final RetryTemplate retryTemplate = new RetryTemplate();

        private static final ConcurrentMap<String, RetryState> states = new ConcurrentHashMap<>();

        static {
            ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
            retryTemplate.setBackOffPolicy(backOff);
        }

        private final CountDownLatch latch1 = new CountDownLatch(3);

        private final CountDownLatch latch2 = new CountDownLatch(1);

        private volatile boolean result;

        @KafkaListener(topics = "sr1", groupId = "sr1")
        public void listen1(final String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                @Header(KafkaHeaders.OFFSET) long offset) {
            String recordKey = topic + partition + offset;
            RetryState retryState = states.get(recordKey);
            if (retryState == null) {
                retryState = new DefaultRetryState(recordKey);
                states.put(recordKey, retryState);
            }
            this.result = retryTemplate.execute(c -> {

                // do your work here

                this.latch1.countDown();
                throw new RuntimeException("retry");
            }, c -> {
                latch2.countDown();
                return true;
            }, retryState);
            states.remove(recordKey);
        }

    }

}

and

Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void org.springframework.kafka.annotation.StatefulRetryTests$Listener.listen1(java.lang.String,java.lang.String,int,long)' threw exception; nested exception is java.lang.RuntimeException: retry

after each delivery attempt.

In this case, I added a recoverer to handle the message after retries are exhausted. You could do something else, like stop the container (but do that on a separate thread, like we do in the ContainerStoppingErrorHandler).

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1