'Can not connect to Kafka using ACL in Docker (Docker Compose)

I am tryting to set up kafka with ACL in docker (docker-compose). I am not able to connect with my python example script. I am using the the script of dicrectcsd

docker-compose.yml

  zookeeper:
    image: confluentinc/cp-zookeeper:5.4.1
    container_name: zookeeper
    ports:
      - '31000:31000'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      KAFKA_OPTS: "-Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider -Djava.security.auth.login.config=/opt/security/zookeeper-server.jaas"
      KAFKA_JMX_HOSTNAME: "localhost"
      KAFKA_JMX_PORT: 31000
    volumes:
      - ./kafka/data/zookeeper/data:/data
      - ./kafka/data/zookeeper/datalog:/datalog
      - ./kafka/data/kafka1/security:/opt/security
    networks:
      - proxy

  kafka:
    image: confluentinc/cp-server:5.4.1
    container_name: kafka
    ports:
      - '9092:9092'
      - '9093:9093'
      - '31001:31001'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: EXTERNAL
      KAFKA_LISTENERS: "EXTERNAL://:9092,INTERNAL://kafka:9093"
      KAFKA_ADVERTISED_LISTENERS: "EXTERNAL://:9092,INTERNAL://kafka:9093"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "EXTERNAL:SASL_PLAINTEXT,INTERNAL:PLAINTEXT"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
#      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
      KAFKA_LISTENER_NAME_EXTERNAL_SASL_ENABLED_MECHANISMS: PLAIN
#      KAFKA_LISTENER_NAME_INTERNAL_SASL_ENABLED_MECHANISMS: PLAIN
#      KAFKA_LISTENER_NAME_EXTERNAL_PLAIN_SASL_JAAS_CONFIG: "KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret' user_admin='admin-secret';} KafkaClient {org.>
#      KAFKA_LISTENER_NAME_INTERNAL_PLAIN_SASL_JAAS_CONFIG: "KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret' user_admin='admin-secret';} KafkaClient {org.>
#      KAFKA_ZOOKEEPER_SASL_CLIENTCONFIG: "org.apache.zookeeper.server.auth.DigestLoginModule required username='admin' password='admin-secret';};"

      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
#      KAFKA_AUTHORIZER_CLASS_NAME: io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
#      KAFKA_CONFLUENT_AUTHORIZER_ACCESS_RULE_PROVIDERS: "ZK_ACL,CONFLUENT"
      KAFKA_SUPER_USERS: "User:admin"
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
      KAFKA_ZOOKEEPER_SET_ACL: "true"
      KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/security/kafka-server.jaas"
      KAFKA_JMX_HOSTNAME: "localhost"
      KAFKA_JMX_PORT: 31001
      #KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      #CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: localhost:9093
      #CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      #CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      #CONFLUENT_METRICS_REPORTER_SECURITY_PROTOCOL: SASL_PLAINTEXT
      #CONFLUENT_METRICS_REPORTER_SASL_MECHANISM: PLAIN
      #CONFLUENT_METRICS_REPORTER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';"

      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./kafka/data/kafka1/data:/var/lib/kafka/data
      - ./kafka/data/kafka1/security:/opt/security
    networks:
      - proxy

kafka-server.jaas

// Server config - used to authorise
KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret"
   user_admin="admin-secret"
   user_producer="producer-secret"
   user_consumer="consumer-secret";
};
// Client config used to connect to Kafka
KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret";
};

// Client config user to connect to Zookeeper
Client {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  username="admin"
  password="admin-secret";
};

zookeeper-server.jaas

Server {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       username="admin"
       password="admin-secret"
       user_admin="admin-secret";
};

Zookeeper log

[2021-12-26 14:14:15,347] INFO adding SASL authorization for authorizationID: admin (org.apache.zookeeper.server.ZooKeeperServer)
[2021-12-26 14:14:18,418] INFO Successfully authenticated client: authenticationID=admin;  authorizationID=admin. (org.apache.zookeeper.server.auth.SaslServerCallbackHandler)
[2021-12-26 14:14:18,418] INFO Setting authorizedID: admin (org.apache.zookeeper.server.auth.SaslServerCallbackHandler)
[2021-12-26 14:14:18,418] INFO adding SASL authorization for authorizationID: admin (org.apache.zookeeper.server.ZooKeeperServer)
[2021-12-26 14:14:20,775] INFO Successfully authenticated client: authenticationID=admin;  authorizationID=admin. (org.apache.zookeeper.server.auth.SaslServerCallbackHandler)
[2021-12-26 14:14:20,775] INFO Setting authorizedID: admin (org.apache.zookeeper.server.auth.SaslServerCallbackHandler)
[2021-12-26 14:14:20,775] INFO adding SASL authorization for authorizationID: admin (org.apache.zookeeper.server.ZooKeeperServer)

Kafka Log

[2021-12-26 14:16:18,822] INFO [KafkaApi-1] Auto creation of topic first_topic with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
[2021-12-26 14:16:18,823] INFO [Controller id=1] New topics: [Set(first_topic)], deleted topics: [Set()], new partition replica assignment [Set(TopicIdReplicaAssignment(first_topic,None,Map(first_topic-0 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=, observers=, targetObservers=None))))] (kafka.controller.KafkaController)
[2021-12-26 14:16:18,823] INFO [Controller id=1] New partition creation callback for first_topic-0 (kafka.controller.KafkaController)
[2021-12-26 14:16:18,824] TRACE [Controller id=1 epoch=1] Changed partition first_topic-0 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
[2021-12-26 14:16:18,824] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition first_topic-0 from NonExistentReplica to NewReplica (state.change.logger)
[2021-12-26 14:16:18,835] TRACE [Controller id=1 epoch=1] Changed partition first_topic-0 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=0, isr=List(1), zkVersion=0) (state.change.logger)
[2021-12-26 14:16:18,835] TRACE [Controller id=1 epoch=1] Sending become-leader LeaderAndIsr request LeaderAndIsrPartitionState(topicName='first_topic', topicId=00000000-0000-0000-0000-000000000000, partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], observers=[], addingReplicas=[], removingReplicas=[], isNew=true) to broker 1 for partition first_topic-0 (state.change.logger)
[2021-12-26 14:16:18,836] TRACE [Controller id=1 epoch=1] Sending UpdateMetadata request UpdateMetadataPartitionState(topicName='first_topic', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], observers=[], offlineReplicas=[]) to brokers Set(1) for partition first_topic-0 (state.change.logger)
[2021-12-26 14:16:18,836] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition first_topic-0 from NewReplica to OnlineReplica (state.change.logger)
[2021-12-26 14:16:18,837] TRACE [Broker id=1] Received LeaderAndIsr request LeaderAndIsrPartitionState(topicName='first_topic', topicId=00000000-0000-0000-0000-000000000000, partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], observers=[], addingReplicas=[], removingReplicas=[], isNew=true) correlation id 5 from controller 1 epoch 1 (state.change.logger)
[2021-12-26 14:16:18,838] TRACE [Broker id=1] Handling LeaderAndIsr request correlationId 5 from controller 1 epoch 1 starting the become-leader transition for partition first_topic-0 (state.change.logger)
[2021-12-26 14:16:18,838] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(first_topic-0) (kafka.server.ReplicaFetcherManager)
[2021-12-26 14:16:18,842] INFO [Log partition=first_topic-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-12-26 14:16:18,842] INFO [Log partition=first_topic-0, dir=/var/lib/kafka/data] Completed load of log with 1 segments, log start offset (merged: 0, local: 0) and log end offset 0 in 2 ms (kafka.log.Log)
[2021-12-26 14:16:18,842] INFO [Log partition=first_topic-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-12-26 14:16:18,843] INFO Completed load of log with 1 segments containing 1 local segments and 0 tiered segments, tier start offset 0, first untiered offset 0, local start offset 0, log end offset 0 (kafka.log.MergedLog)
[2021-12-26 14:16:18,844] INFO Created log for partition first_topic-0 in /var/lib/kafka/data/first_topic-0 with properties {compression.type -> producer, message.downconversion.enable -> true, confluent.missing.id.cache.ttl.sec -> 60, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, confluent.tier.local.hotset.ms -> 86400000, confluent.tier.local.hotset.bytes -> -1, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 9223372036854775807, confluent.append.record.interceptor.classes -> [], confluent.tier.enable -> false, message.format.version -> 2.4-IV1, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, confluent.schema.registry.max.cache.size -> 10000, unclean.leader.election.enable -> false, confluent.missing.id.query.range -> 200, retention.bytes -> -1, delete.retention.ms -> 86400000, confluent.schema.registry.max.retries -> 1, segment.ms -> 604800000, confluent.schema.registry.retries.wait.ms -> 0, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-12-26 14:16:18,845] INFO [Partition first_topic-0 broker=1] No checkpointed highwatermark is found for partition first_topic-0 (kafka.cluster.Partition)
[2021-12-26 14:16:18,845] INFO [Partition first_topic-0 broker=1] Log loaded for partition first_topic-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-12-26 14:16:18,845] INFO [Partition first_topic-0 broker=1] first_topic-0 starts at leader epoch 0 from offset 0 with high watermark 0. Previous leader epoch was -1. (kafka.cluster.Partition)
[2021-12-26 14:16:18,851] TRACE [Broker id=1] Stopped fetchers as part of become-leader request from controller 1 epoch 1 with correlation id 5 for partition first_topic-0 (last update controller epoch 1) (state.change.logger)
[2021-12-26 14:16:18,851] TRACE [Broker id=1] Completed LeaderAndIsr request correlationId 5 from controller 1 epoch 1 for the become-leader transition for partition first_topic-0 (state.change.logger)
[2021-12-26 14:16:18,852] TRACE [Controller id=1 epoch=1] Received response {error_code=0,partition_errors=[{topic_name=first_topic,partition_index=0,error_code=0,_tagged_fields={}}],_tagged_fields={}} for request LEADER_AND_ISR with correlation id 5 sent to broker 6e48df004a23:9092 (id: 1 rack: null) (state.change.logger)
[2021-12-26 14:16:18,853] TRACE [Broker id=1] Cached leader info UpdateMetadataPartitionState(topicName='first_topic', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], observers=[], offlineReplicas=[]) for partition first_topic-0 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 6 (state.change.logger)
[2021-12-26 14:16:18,854] TRACE [Controller id=1 epoch=1] Received response {error_code=0,_tagged_fields={}} for request UPDATE_METADATA with correlation id 6 sent to broker 6e48df004a23:9092 (id: 1 rack: null) (state.change.logger)

Python producer example script

from kafka import KafkaProducer
topic = "first_topic"
sasl_mechanism = "PLAIN"
username = "admin"
password = "admin-secret"
security_protocol = "SASL_PLAINTEXT"
bootstrap_servers = ['kafka.***:9092']
producer = KafkaProducer(bootstrap_servers = bootstrap_servers,
                          #api_version=(0, 10),
                          security_protocol=security_protocol,
                          #ssl_context=context,
                          #ssl_check_hostname=True,
                          #ssl_cafile='../keys/CARoot.pem',
                          sasl_mechanism = sasl_mechanism,
                          sasl_plain_username = username,
                          sasl_plain_password = password)
                          #ssl_certfile='../keys/certificate.pem',
                          #ssl_keyfile='../keys/key.pem')#,api_version = (0, 10))
#producer = KafkaProducer()

future = producer.send('first_topic', b'another_message again')
result = future.get(timeout=60)

print(result)

Error message of python script:

Exception has occurred: KafkaTimeoutError
KafkaTimeoutError: Batch for TopicPartition(topic='first_topic', partition=0) containing 1 record(s) expired: 31 seconds have passed since batch creation plus linger time
  File "C:\git\kafka_test\producer_acl.py", line 22, in <module>
    result = future.get(timeout=60)

Would you help me and point me in the right direction to solve this problem please.

Thank you so much

Simon



Solution 1:[1]

It turns out you can actually access Kafka, but only inside docker, using kafka:9092.

The reason being the KAFKA_ADVERTISED_LISTENERS must be set to external ip address/domain. More about it here: https://rmoff.net/2018/08/02/kafka-listeners-explained/ and https://www.confluent.io/blog/kafka-listeners-explained/

LISTENERS are what interfaces Kafka binds to. ADVERTISED_LISTENERS are how clients can connect.

Here's my docker-compose.yml

NOTE: replace your.ip.addr below with your ip address/domain name

...
...
  kafka:
    image: confluentinc/cp-server:5.4.1
    ports:
      - '9092:9092'
      - '9093:9093'
      - '9094:9094'
      - '31001:31001'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: EXTERNAL
      KAFKA_LISTENERS: "EXTERNAL://kafka:9092,INTERNAL://localhost:9093,OUTSIDE://:9094"
      KAFKA_ADVERTISED_LISTENERS: "EXTERNAL://kafka:9092,INTERNAL://localhost:9093,OUTSIDE://your.ip.addr:9094"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "EXTERNAL:SASL_PLAINTEXT,INTERNAL:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
...
...

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 satyajit_ghana