'Failure to re-authenticate Kafka client SASL connection, should close connection

As per KIP-368 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-368), when 'connections.max.reauth.ms' is explicitly set to a positive number the server will disconnect any SASL connection that does not re-authenticate. If the re-authentication attempt fails then the connection will be closed by the broker, retries are not supported. However when my client fails to re-authenticate, it goes into infinite loops of retry.

INFO [kafka-producer-network-thread | producer-1] org.apache.kafka.common.network.Selector: [Producer clientId=producer-1][Producer clientId=producer-1] Failed authentication with 10.4.252.249/10.4.252.249 (Authentication failed during authentication due to invalid credentials with SASL mechanism)
ERROR [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1] Connection to node 0 (10.4.252.249/10.4.252.249:9096) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism
INFO [kafka-producer-network-thread | producer-1] org.apache.kafka.common.network.Selector: [Producer clientId=producer-1][Producer clientId=producer-1] Failed authentication with 10.4.252.249/10.4.252.249 (Authentication failed during authentication due to invalid credentials with SASL mechanism)
ERROR [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1] Connection to node 0 (10.4.252.249/10.4.252.249:9096) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism

I want the client to exit so I can bubble up the exception. Any ideas how I can address this?



Solution 1:[1]

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed: Invalid username or password

Kafka will throw SaslAuthenticationException when authenticate fail. You can surround client code with try catch and close the client in the catch block. Take admin client for example.

//... init other properties

// set request time out, the default timeout takes too long
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);

// get client
Admin admin = Admin.create(properties);

// test connection, if error happens, close it, in case of infinite retrying to connect to kafka by admin client, because of the meta data update thread
try {
    admin.listTopics().names().get();
} catch (Exception e) {
    closeClient(admin);
    throw new RRException(e);
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 allen