'Spring Boot Kafka Configure DefaultErrorHandler?

I created a batch-consumer following the Spring Kafka docs:

@SpringBootApplication
public class ApplicationConsumer {
  private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationConsumer.class);
  private static final String TOPIC = "foo";

  public static void main(String[] args) {
    ConfigurableApplicationContext context = SpringApplication.run(ApplicationConsumer.class, args);
  }

  @Bean
  public RecordMessageConverter converter() {
    return new JsonMessageConverter();
  }

  @Bean
  public BatchMessagingMessageConverter batchConverter() {
    return new BatchMessagingMessageConverter(converter());
  }

  @KafkaListener(topics = TOPIC)
  public void listen(List<Name> ps) {
    LOGGER.info("received name beans: {}", Arrays.toString(ps.toArray()));
  }
}

I was able to successfully get the consumer running by defining the following additional configuration env variables, that Spring automatically picks up:

export SPRING_KAFKA_BOOTSTRAP-SERVERS=...
export SPRING_KAFKA_CONSUMER_GROUP-ID=...

So the above code works. But now I want to customize the default error handler to use exponential backoff. From the ref docs I tried adding the following to ApplicationConsumer class:

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setCommonErrorHandler(new DefaultErrorHandler(new ExponentialBackOffWithMaxRetries(10)));
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    return props;
}

But now I get errors saying that it can't find some of the configuration. It looks like I'm stuck having to redefine all of the properties in consumerConfigs() that were already being automatically defined before. This includes everything from bootstrap server uris to the json-deserialization config.

Is there a good way to update my first version of the code to just override the default-error handler?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source