'How to check if Kafka instance is up

I'm working on a Spring Boot project where I consume messages from Kafka topics using the spring-kafka dependency.

This is my listener and it works fine:

@KafkaListener(topics = "${mytopic.name}",  containerFactory = "kafkaEventListenerObjectContainerFactory")
public void listenTopic(ConsumerRecord<String, KafkaEvent> cr,
                       @Payload KafkaEvent kafkaEvent) throws JsonProcessingException {
                           
// my code here...                             
    
}

My issue is that I want to check if the Kafka instance is up. I want to develop a REST API that connects to my Kafka instance, if the connection is not OK, I want to send an alert via email.

Is my idea right, or is there another clean way to check if Kafka is up?

Do you have any good idea to achieve this? I plan to execute my check every 15 minutes to get the status.



Solution 1:[1]

this way from command line

Step 3: Create a topic Let's create a topic named "test" with a single partition and only one replica:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_name
We can now see that topic if we run the list topic command:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
topic_name

Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.

this Reference check it:

https://kafka.apache.org/081/documentation.html

or this way from application you can see debug trace when run your application if consumer re-joined and consumer config is correct with producer config

like that:

Producer config:

2021-07-26 15:09:46.124  INFO 14802 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
acks = 1
batch.size = 16384
bootstrap.servers = [kafka:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = 
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT

and this Consumer Config:

 2021-07-26 15:09:48.140  INFO 14802 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-1, groupId=store-services] Subscribed to topic(s): updateRate //here topic name 
2021-07-26 15:09:48.146  INFO 14802 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [kafka:9092]
    check.crcs = true
    client.dns.lookup = default
    client.id = 
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = groupId
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8

and you must show this when successfully joined :

    2021-07-26 15:15:53.292  INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=store-services] Revoking previously assigned partitions []
2021-07-26 15:15:53.292  INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=store-services] (Re-)joining group
2021-07-26 15:15:53.294  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
2021-07-26 15:15:53.295  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] Revoking previously assigned partitions []
2021-07-26 15:15:53.295  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] (Re-)joining group
2021-07-26 15:15:53.407  INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=store-services] (Re-)joining group
2021-07-26 15:15:53.408  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] (Re-)joining group
2021-07-26 15:15:53.412  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] (Re-)joining group
2021-07-26 15:15:53.412  INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=store-services] (Re-)joining group
2021-07-26 15:15:56.462  INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=store-services] Successfully joined group with generation 11
2021-07-26 15:15:56.462  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] Successfully joined group with generation 11
2021-07-26 15:15:56.464  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] Setting newly assigned partitions: addingOrderForBranch-0
2021-07-26 15:15:56.464  INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=store-services] Setting newly assigned partitions: updateRate-0
2021-07-26 15:15:56.474  INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-2, groupId=groupId] Setting offset for partition addingOrderForBranch-0 to the committed offset FetchPosition{offset=328, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), epoch=0}}
2021-07-26 15:15:56.474  INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=store-services] Setting offset for partition updateRate-0 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), epoch=0}}

Solution 2:[2]

There is the out of the box solution from Spring.

Add the dependency for spring-boot-starter-actuator add some additional configs to your application.properties or yaml file e.g. management.server.port=8443 or you can ommit and it uses your web server port. in addition you might want to add additional management.endpoint.xxx configs (Enabling Production-ready Features). Add a StreamsHealthIndicator class to your project and then you can call the actuator from outside e.g. http://localhost:8443/actuator/health and will include the result from your StreamsHealthIndicator. Depending on the setting management.endpoint.health.show-details=ALWAYS it will also include all the details from the HealthIndicator like threads, tasks, etc.

An example for a StreamsHealthIndicator can be found here example

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1
Solution 2 Thomas