'How to check if Kafka instance is up
I'm working on a Spring Boot project where I consume messages from Kafka topics using the spring-kafka dependency.
This is my listener and it works fine:
@KafkaListener(topics = "${mytopic.name}", containerFactory = "kafkaEventListenerObjectContainerFactory")
public void listenTopic(ConsumerRecord<String, KafkaEvent> cr,
@Payload KafkaEvent kafkaEvent) throws JsonProcessingException {
// my code here...
}
My issue is that I want to check if the Kafka instance is up. I want to develop a REST API that connects to my Kafka instance, if the connection is not OK, I want to send an alert via email.
Is my idea right, or is there another clean way to check if Kafka is up?
Do you have any good idea to achieve this? I plan to execute my check every 15 minutes to get the status.
Solution 1:[1]
this way from command line
Step 3: Create a topic Let's create a topic named "test" with a single partition and only one replica:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_name
We can now see that topic if we run the list topic command:
> bin/kafka-topics.sh --list --zookeeper localhost:2181 topic_nameAlternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.
this Reference check it:
or this way from application you can see debug trace when run your application if consumer re-joined and consumer config is correct with producer config
like that:
Producer config:
2021-07-26 15:09:46.124 INFO 14802 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [kafka:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id =
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
and this Consumer Config:
2021-07-26 15:09:48.140 INFO 14802 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-1, groupId=store-services] Subscribed to topic(s): updateRate //here topic name
2021-07-26 15:09:48.146 INFO 14802 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [kafka:9092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = groupId
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
and you must show this when successfully joined :
2021-07-26 15:15:53.292 INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=store-services] Revoking previously assigned partitions []
2021-07-26 15:15:53.292 INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=store-services] (Re-)joining group
2021-07-26 15:15:53.294 INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=groupId] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
2021-07-26 15:15:53.295 INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=groupId] Revoking previously assigned partitions []
2021-07-26 15:15:53.295 INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=groupId] (Re-)joining group
2021-07-26 15:15:53.407 INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=store-services] (Re-)joining group
2021-07-26 15:15:53.408 INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=groupId] (Re-)joining group
2021-07-26 15:15:53.412 INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=groupId] (Re-)joining group
2021-07-26 15:15:53.412 INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=store-services] (Re-)joining group
2021-07-26 15:15:56.462 INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=store-services] Successfully joined group with generation 11
2021-07-26 15:15:56.462 INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=groupId] Successfully joined group with generation 11
2021-07-26 15:15:56.464 INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=groupId] Setting newly assigned partitions: addingOrderForBranch-0
2021-07-26 15:15:56.464 INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=store-services] Setting newly assigned partitions: updateRate-0
2021-07-26 15:15:56.474 INFO 14802 --- [ntainer#1-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=groupId] Setting offset for partition addingOrderForBranch-0 to the committed offset FetchPosition{offset=328, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), epoch=0}}
2021-07-26 15:15:56.474 INFO 14802 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=store-services] Setting offset for partition updateRate-0 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), epoch=0}}
Solution 2:[2]
There is the out of the box solution from Spring.
Add the dependency for spring-boot-starter-actuator add some additional configs to your application.properties or yaml file e.g. management.server.port=8443 or you can ommit and it uses your web server port. in addition you might want to add additional management.endpoint.xxx configs (Enabling Production-ready Features).
Add a StreamsHealthIndicator class to your project and then you can call the actuator from outside e.g. http://localhost:8443/actuator/health and will include the result from your StreamsHealthIndicator. Depending on the setting management.endpoint.health.show-details=ALWAYS it will also include all the details from the HealthIndicator like threads, tasks, etc.
An example for a StreamsHealthIndicator can be found here example
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | Thomas |
