'When @RetryableTopic annotation is used, SerializationException exception is not handled
I have configured RetryableTopic annotation and it is working as expected but
Also, I have configured setCommonErrorHandler for handling the SerializationException exception to seek the offset, but it is not working.
If remove the RetryableTopic annotation, The SerializationException is handled but not with RetryableTopic annotation.
Below is the code for reference,
@RetryableTopic(attempts = "3", backoff = @Backoff(delay = 120000, multiplier
= 2.0), autoCreateTopics = "false", topicSuffixingStrategy =
TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "#{kafkaProperties.getTopic()}", groupId = "#{kafkaProperties.getGroupId()}")
public void handleMessage(Message message) {
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory(
ConsumerFactory<String, Message> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(kafkaProperties.getConcurrency());
factory.setCommonErrorHandler(new KafkaConsumptionErrHandler());
return factory;
}
public class KafkaConsumptionErrHandler extends CommonLoggingErrorHandler {
private void seekSerializeException(Exception e, Consumer<?, ?> consumer) {
String p = ".*partition (.*) at offset ([0-9]*).*";
Pattern r = Pattern.compile(p);
Matcher m = r.matcher(e.getMessage());
if (m.find()) {
int idx = m.group(1).lastIndexOf("-");
String topics = m.group(1).substring(0, idx);
int partition = Integer.parseInt(m.group(1).substring(idx + 1));
int offset = Integer.parseInt(m.group(2));
TopicPartition topicPartition = new TopicPartition(topics, partition);
consumer.seek(topicPartition, (offset + 1));
log.info("Skipped message, with offset {} from partition {}", offset, partition);
}
}
@Override
public void handleOtherException(Exception e, Consumer<?, ?> consumer, MessageListenerContainer container,
boolean batchListener) {
log.error(">>> Error in process with Exception {}", e.getMessage());
if (e instanceof SerializationException)
seekSerializeException(e, consumer);
}
}
Solution 1:[1]
The RetryableTopic feature creates and configures it's own DefaultErrorHandler in order to be able to properly configure the DeadLetterPublishingRecoverer that forwards records to the retry topics.
It's not compatible with CommonLoggingErrorHandler, but you can provide your own DefaultErrorHandler subclass by overriding the createDefaultErrorHandlerInstance method in a ListenerContainerFactoryConfigurer bean, such as:
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory) {
return new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, Clock.systemUTC()) {
@Override
protected DefaultErrorHandler createDefaultErrorHandlerInstance(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new MyDefaultErrorHandlerSubClass(deadLetterPublishingRecoverer);
}
};
}
If you get a NoSuchBeanDefinitionException, refer to this other answer for a workaround. We're currently working on a fix for that - not sure when it'll be available though.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
