'Kafka producer never entering the failure callback

We have a Kafka producer set to be idempotent and we would like to catch if the broker is actually up and running before sending the message.

The abstractSender that is implemented by all the various producers has the following logic on the success/failure

    @Override
    public void send(T message) {
        ListenableFuture<SendResult<String, T>> future = this.kafkaTemplate.send(eventType().getTopic(), message);
        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onFailure(Throwable ex) {
                logger.error("Unable to send message=[ {} ] due to : {}", message, ex.getMessage());
            }
            
            @Override
            public void onSuccess(SendResult<String, T> result) {
                logger.error("Sent message=[ {} ] with offset=[ {} ]", message, result.getRecordMetadata().offset());
            }

        });
    }

We also set the properties of the producer as follows:

producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000);
producerProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 3000);

It seems however that, even if the broker is offline, the sender keeps retrying without entering in the failure callback at all.

[kafka-producer-network-thread | producer-"kafka-tx-"-meetingMessage0] WARN  org.apache.kafka.clients.NetworkClient processDisconnection - [Producer clientId=producer-"kafka-tx-"-meetingMessage0, transactionalId="kafka-tx-"-meetingMessage0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[kafka-producer-network-thread | producer-"kafka-tx-"-meetingMessage0] WARN  org.apache.kafka.clients.NetworkClient handleServerDisconnect - [Producer clientId=producer-"kafka-tx-"-meetingMessage0, transactionalId="kafka-tx-"-meetingMessage0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
[kafka-producer-network-thread | producer-"kafka-tx-"-meetingMessage0] WARN  org.apache.kafka.clients.NetworkClient processDisconnection - [Producer clientId=producer-"kafka-tx-"-meetingMessage0, transactionalId="kafka-tx-"-meetingMessage0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[kafka-producer-network-thread | producer-"kafka-tx-"-meetingMessage0] WARN  org.apache.kafka.clients.NetworkClient handleServerDisconnect - [Producer clientId=producer-"kafka-tx-"-meetingMessage0, transactionalId="kafka-tx-"-meetingMessage0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
...
...
[kafka-producer-network-thread | producer-"kafka-tx-"-meetingMessage0] WARN  org.apache.kafka.clients.NetworkClient processDisconnection - [Producer clientId=producer-"kafka-tx-"-meetingMessage0, transactionalId="kafka-tx-"-meetingMessage0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[kafka-producer-network-thread | producer-"kafka-tx-"-meetingMessage0] WARN  org.apache.kafka.clients.NetworkClient handleServerDisconnect - [Producer clientId=producer-"kafka-tx-"-meetingMessage0, transactionalId="kafka-tx-"-meetingMessage0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

Do you have any idea on how we can manage to catch the fact that the broker is down from the consumer perspective, while respecting the idempotence?

Thank you in advance!



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source