'Automatic assignment of Kafka partitions to Kafka consumers

I am creating a list of Kafka consumers as below:

public List<Consumer<String, String>> consumerPool(KafkaProperties kafkaProperties){
        consumerInstance = 10;
        List<Consumer<String, String>> consumerList = new ArrayList<>();
        for (int c = 0; c < consumerInstance; c++) {
            Consumer<String, String> consumer = consumerFactory(kafkaProperties).createConsumer();
            consumer.subscribe(Arrays.asList("topic1"));
            consumer.poll(Duration.ofMillis(10));
            consumerList.add(consumer);
        }

        return consumerList;
    }

public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }

I get the KafkaProperties set in the application config. I have set the KafkaGroupID to be "group1" in the application.yml so all the consumers belong to the same consumer group

The issue I am facing is the topics are not assigned to this consumers. I tested that using:

 Set<TopicPartition> partitionSet = consumer.assignment();
 partitionSet.forEach(topicPartition -> log.debug( "consumer:{}, Partition:{}", consumer.toString(), topicPartition.partition()));

Partition set is empty for all the consumers in the list.

I am able to assign the partition manually for each consumer using

TopicPartition tp = new TopicPartition("partition1", c);
consumer.assign(Collections.singleton(tp));

However I want this to be done dynamically instead of manually. The main reason is so that the partition is assigned automatically if a rebalance occurs.



Solution 1:[1]

Ensuring all the consumers fall under same consumer group, kafka by default assigns each partition to each consumer. More information below: https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Dheeraj