'When my Flink SQL uses Kafka with Kerberos authentication and submits it to yarn, why does it always fail to pass Kafka authentication?

Using the Kafka data source with Kerberos authentication on Flink SQL, the local test of Flink has passed, but when I push the task to yarn, the error message prompts the problem that the JAAS file of Kafka cannot be found!

Flink SQL related codes are as follows:

create table source_sensor(
        id VARCHAR,
        ts bigint,
        vc double)
    WITH (
        'connector' = 'kafka',
        'topic' = 'huangfu_0110',
        'scan.startup.mode' = 'latest-offset',
        'properties.group.id' = '1',
        'properties.bootstrap.servers' = '10.0.120.23:9092',
        'properties.security.protocol' = 'SASL_PLAINTEXT',
        'properties.sasl.mechanism' = 'GSSAPI',
        'properties.sasl.kerberos.service.name' = 'kafka',
        'properties.sasl.kerberos.principal.to.local.rules' = 'kafka/[email protected]',
        'properties.sasl.sasl.jaas.config' = 'com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/ddmp/kafka/kafka.keytab\" principal=\"kafka/[email protected]\";',
        'format' = 'json',
        'json.ignore-parse-errors' = 'true',
        'json.fail-on-missing-field' = 'false'
    );

create table sink_sensor(
        id VARCHAR,
        ts bigint,
        vc double)
    WITH (
        'connector'='jdbc',
        'url'='jdbc:mysql://10.0.10.118:3306/yf1?useSSL=true',
        'table-name'='sink_table',
        'username'='root',
        'password'='123456',
        'sink.buffer-flush.interval'='10s',
        'sink.buffer-flush.max-rows'='10000'
    );
insert into sink_sensor select * from source_sensor;

After submitting, Flink reports the following errors

org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:55) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:551) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [byit-flink-sql-engine.jar:4.1.1]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_211]
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is /yarn/nm/usercache/ddmp/appcache/application_1642585778653_0040/jaas-2809721433806366634.conf
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:124) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.kafka.shaded.org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99) ~[byit-flink-sql-engine.jar:4.1.1]
    at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:741) ~[byit-flink-sql-engine.jar:4.1.1]
    ... 15 more

All the file paths in the configuration file in Flink SQL and the local directory exist

The error I observed is that when connecting Kafka, he always looks for a file in the [/yarn/nm/usercache/ddmp/appcache/application_1642585778653_0040/jaas-2809721433806366634.conf] directory, which is very strange

I would appreciate it if you could help me solve it!



Solution 1:[1]

It's caused by the flink cluster kerberos configuration. Just need to set some config on flink-conf.yaml can make it work fine!

The settings is below:

security.kerberos.login.use-ticket-cache: false 
security.kerberos.login.keytab: /etc/kafka/kafka.keytab
security.kerberos.login.principal: [email protected]
security.kerberos.login.contexts: Client,KafkaClient

Similar question: Flink SQL Client connect to secured kafka cluster

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 ChangLi