'Spring Apache Kafka onFailure Callback of KafkaTemplate not fired on connection error
I'm experimenting a lot with Apache Kafka in a Spring Boot App at the moment.
My current goal is to write a REST endpoint that takes in some message payload, which will use a KafkaTemplate to send the data to my local Kafka running on port 9092.
This is my producer config:
@Bean
public Map<String,Object> producerConfig() {
// config settings for creating producers
Map<String,Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,this.bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,5000);
configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,4000);
configProps.put(ProducerConfig.RETRIES_CONFIG,0);
return configProps;
}
@Bean
public ProducerFactory<String,String> producerFactory() {
// creates a kafka producer
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean("kafkaTemplate")
public KafkaTemplate<String,String> kafkaTemplate(){
// template which abstracts sending data to kafka
return new KafkaTemplate<>(producerFactory());
}
My rest endpoint forwards to a service, the service looks like this:
@Service
public class KafkaSenderService {
@Qualifier("kafkaTemplate")
private final KafkaTemplate<String,String> kafkaTemplate;
@Autowired
public KafkaSenderService(KafkaTemplate<String,String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessageWithCallback(String message, String topicName) {
// possibility to add callbacks to define what shall happen in success/ error case
ListenableFuture<SendResult<String,String>> future = kafkaTemplate.send(topicName, message);
future.addCallback(new KafkaSendCallback<String, String>() {
@Override
public void onFailure(KafkaProducerException ex) {
logger.warn("Message could not be delivered. " + ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
logger.info("Your message was delivered with following offset: " + result.getRecordMetadata().offset());
}
});
}
}
The thing now is: I'm expecting the "onFailure()" method to get called when the message could not be sent. But this seems not to work. When I change the bootstrapServers variable in the producer config to localhost:9091 (which is the wrong port, so there should be no connection possible), the producer tries to connect to the broker. It will do several connection attempts, and after 5 seconds, a TimeOutException will occur. But the "onFailure() method won't get called. Is there a way to achieve that the "onFailure()" method can get called event if the connection cannot be established?
And by the way, I set the retries count to zero, but the prodcuer still does a second connection attempt after the first one. This is the log output:
EDIT: it seems like the Kafke producer/ KafkaTemplate goes into an infinite loop when the broker is not available. Is that really the intended behaviour?
Solution 1:[1]
Question answered inside the discussion on https://github.com/spring-projects/spring-kafka/discussions/2250# for anyone else stumbling across this thread. In short, kafkaTemplate.getProducerFactory().reset();does the trick.
Solution 2:[2]
The KafkaTemplate does really nothing fancy about connection and publishing. Everything is delegated to the KafkaProducer. What you describe here would happen exactly even if you'd use just plain Kafka Client.
See KafkaProducer.send() JavaDocs:
* @throws TimeoutException If the record could not be appended to the send buffer due to memory unavailable
* or missing metadata within {@code max.block.ms}.
Which happens by the blocking logic in that producer:
/**
* Wait for cluster metadata including partitions for the given topic to be available.
* @param topic The topic we want metadata for
* @param partition A specific partition expected to exist in metadata, or null if there's no preference
* @param nowMs The current time in ms
* @param maxWaitMs The maximum time in ms for waiting on the metadata
* @return The cluster containing topic metadata and the amount of time we waited in ms
* @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms}
* @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
*/
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
Unfortunately this is not explained in the send() JavaDocs which claims to be fully asynchronous, but apparently it is not. At least in this metadata part which has to be available before we enqueue the record for publishing.
That's what we cannot control and it is not reflected on the returned Future:
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
See more info in Apache Kafka docs how to adjust the KafkaProducer for this matter: https://kafka.apache.org/documentation/#theproducer
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Walnussbär |
| Solution 2 | Artem Bilan |

