'Automatic assignment of Kafka partitions to Kafka consumers
I am creating a list of Kafka consumers as below:
public List<Consumer<String, String>> consumerPool(KafkaProperties kafkaProperties){
consumerInstance = 10;
List<Consumer<String, String>> consumerList = new ArrayList<>();
for (int c = 0; c < consumerInstance; c++) {
Consumer<String, String> consumer = consumerFactory(kafkaProperties).createConsumer();
consumer.subscribe(Arrays.asList("topic1"));
consumer.poll(Duration.ofMillis(10));
consumerList.add(consumer);
}
return consumerList;
}
public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
I get the KafkaProperties set in the application config. I have set the KafkaGroupID to be "group1" in the application.yml so all the consumers belong to the same consumer group
The issue I am facing is the topics are not assigned to this consumers. I tested that using:
Set<TopicPartition> partitionSet = consumer.assignment();
partitionSet.forEach(topicPartition -> log.debug( "consumer:{}, Partition:{}", consumer.toString(), topicPartition.partition()));
Partition set is empty for all the consumers in the list.
I am able to assign the partition manually for each consumer using
TopicPartition tp = new TopicPartition("partition1", c);
consumer.assign(Collections.singleton(tp));
However I want this to be done dynamically instead of manually. The main reason is so that the partition is assigned automatically if a rebalance occurs.
Solution 1:[1]
Ensuring all the consumers fall under same consumer group, kafka by default assigns each partition to each consumer. More information below: https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Dheeraj |
