'Kafka InvalidTopicException for java consumer but not for kafka-console-consumer
Obviously I have something wrong in my client code, looking for help determining what.
Create the topic:
kafka-topics --create --topic HELLO --bootstrap-server localhost:29092
Produce some messages:
kafka-console-producer --bootstrap-server localhost:29092 --topic HELLO --property "key.separator=-" -property "parse.key=true"
>1-hello
>2-goodbye
>3-other stuff
Consume them all:
kafka-console-consumer --bootstrap-server localhost:29092 --topic HELLO --from-beginning
hello
goodbye
other stuff
Try the same in Java:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
Consumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("HELLO"));
while(true) {
ConsumerRecords<Long, String> consumerRecords =
consumer.poll(Duration.ofMillis(100));
consumerRecords.forEach(record -> {
System.out.printf("Consumer Record:(%d, %s, %d, %d)\n",
record.key(), record.value(),
record.partition(), record.offset());
});
consumer.commitAsync();
}
But the first time I poll it dies with these logs:
...
2022-03-29 20:00:32.386 WARN 74696 --- [ restartedMain] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-KafkaExampleConsumer-1, groupId=KafkaExampleConsumer] Error while fetching metadata with correlation id 2 : {"HELLO"=INVALID_TOPIC_EXCEPTION}
2022-03-29 20:00:32.386 ERROR 74696 --- [ restartedMain] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-KafkaExampleConsumer-1, groupId=KafkaExampleConsumer] Metadata response reported invalid topics ["HELLO"]
2022-03-29 20:00:32.387 INFO 74696 --- [ restartedMain] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-KafkaExampleConsumer-1, groupId=KafkaExampleConsumer] Cluster ID: tyYW3M_ETRePtSOmM4ifJQ
org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: ["HELLO"]
I wonder if this is something to do with consumer groups/partition count?
Solution 1:[1]
Found the explanation; here's the error again:
org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: ["HELLO"]
I pulling the topic string from a config file, and used quotes in the config file. So my config reader read the quotes as part of the string value. I was trying to subscribe to the string "quote-HELLO-unquote" (with literal quotes inside the string), and the quotes are invalid characters.
The error was telling me, I just wasn't seeing it. Here's another invalid topic string with a carat in it but not with literal quotes. Note that the error string does not place quotes around it:
org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [hel^lo]
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | asyncify |
