'KafkaListener stops consuming with a new ListenerContainerFactory
I want to add container customizer to config retries of KafkaListeners and as well adding RecoveryCallback for when retries are exhausted. In our Kafka library which is added as dependency, there is already a listener container factory:
@Bean
public CustomConcurrentContainerListenerFactory kafkaListenerContainerFactory(ConsumerFactory<String, Object> consumerFactory) {
CustomConcurrentContainerListenerFactory factory = new CustomConcurrentContainerListenerFactory();
factory.setConsumerFactory(consumerFactory);
factory.setMessageConverter(this.kafkaAvroMessageConverter);
factory.setReplyTemplate(this.kafkaTemplate);
return factory;
}
CustomConcurrentContainerListenerFactory is custom container which extends ConcurrentKafkaListenerContainerFactory
In the application I added a new container factory which uses ConsumerFactory defined in the library and a ContainerCustomizer as shown below:
@Bean("sendEmailListenerFactory")
public ConcurrentKafkaListenerContainerFactory sendEmailListenerFactory(ConsumerFactory<String, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Component
public class ContainerCustomizer {
public ContainerCustomizer(ConcurrentKafkaListenerContainerFactory sendEmailListenerFactory) {
sendEmailListenerFactory.setContainerCustomizer(container -> {
if (container.getListenerId().equals("handleMediumPriorityEmails")) {
container.getContainerProperties().setCommitRetries(2);
}
});
}
}
There are already existing KafkaListeners in the application and when I add sendEmailListenerFactory as the containerFactory to one of them, that listener stops consuming.
@KafkaListener(id = "handleMediumPriorityEmails",
topics = ActorNotificationSendEmailCommand.TOPIC,
containerFactory = "sendEmailListenerFactory")
public void handleMedium(SendEmailCommand command) {
consume(NotificationPriority.MEDIUM, command);
}
I checked bootstrap.server on the container and they are fine. Where else I need to dig to find the issue.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
