'How to pause a kafka consumer?
I am using a Kafka producer - consumer model in my framework. The record consumed at the consumer end is later indexed onto the elasticsearch. Here i have a use case where if the ES is down, I will have to pause the kafka consumer until the ES is up, Once it is up, I need to resume the consumer and consume the record from where I last left. I don't think this can be achieved with @KafkaListener. Can anyone please give me a solution for this? I figured out that I need to write my own KafkaListenerContainer for this, but I am not able to implement it correctly. Any help would be much appreciated.
Solution 1:[1]
There are sevaral solutions possible, one simple way would be to use the KafkaConsumer API. In KafkaConsumer implementation keeps track of the position on the topic which will be retrieved with the next call to poll(...). Your problem is after you get the record from Kafka, you may be unable to insert it into Elastic Search. In this case, you have to write a routine to reset the position of the consumer, which in your case will be consumer.seek(partition, consumer.position(partition)-1). This will reset the position to the earlier position. At this point a good approach would be to pause the partition (this will enable the server to do some resource clean up) and then poll the ES (by whatever mechanism you desire). Once ES is available, call resume on the consumer and continue with your usual poll-insert cycle.
EDITED AFTER DISCUSSION
Create a spring bean with the lifecycle methods specified. In the initialization method of the bean instantiate your KafkaConsumer (retrieve the configuration of consumer from any source). From the method start a thread to interact with consumer and update ES, rest of the design is as per above. This is a single thred model. For higher throughput consider keeping the data retrieved from Kafka in small in memory queue and a dispatcher thread to take the message and give it to a pooled thread for updating ES.
Solution 2:[2]
I would suggest rather pausing consumer , why can't you retry the same message again and again and commit offset once message is consumed successfully.
For Example:
Annotate your method with @Retryable
And block your method with try/catch and throw new exception in catch block.
For ListenerFactory configuration add property:
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(false);
Solution 3:[3]
Autowired
private KafkaListenerEndpointRegistry registry;
KafkaListener(id = "dltGroup", topics = "actualTopicNAme.DLT", autoStartup = "false")
public void dltListen(String in) {
logger.info("Received from DLT: " + in);
}
public void startKafka() {
// TODO if not running
registry.getListenerContainer("dltGroup").start();
}
public void resumeKafka() {
if (registry.getListenerContainer("dltGroup").isContainerPaused() ||
registry.getListenerContainer("dltGroup").isPauseRequested()) {
registry.getListenerContainer("dltGroup").resume();
}
}
public void pauseKafka() {
if (registry.getListenerContainer("dltGroup").isRunning()) {
registry.getListenerContainer("dltGroup").pause();
}
}
Solution 4:[4]
There are a couple of ways you can achieve this.
Method #1
Create your KafkaConsumer object inside a Thread and run an infinite while loop to consume events.
Once you have this setup you can interrupt the thread and in the while loop, have check if Thread.interrupt() is true. If yes, break out of the loop and close the consumer.
Once you are done with your recovery activity, recreate the consumer with the same group ID. Do note, this may rebalance the consumer.
If you are using python same thing can be achieved using threads stop_event.
Method #2
Use KafkaConumer APIs pause(partitions_list) function. It accepts Kafka partitions as input. So, extract all the portions assigned to the consumer and pass these portions to the pause(partitions_list) function. The consumer will stop pulling data from these partitions.
After a certain time, you can use the resume(partitions_list) function to resume the consumer. This method will not rebalance consumers.
Note: If you are using the Spring Kafka client. This becomes a lot easier. You can start/stop the Message Listener Container.
You can find a detailed explanation here.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | Tom Aranda |
| Solution 3 | Chetan Gowda |
| Solution 4 | Bikas Katwal |
