'Kafka : ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class
I have this exception in the consumer when trying to cast the record.value() into java object :
ClassCastException: class org.apache.avro.generic.GenericData$Record cannot be cast to class [...].PublicActivityRecord (org.apache.avro.generic.GenericData$Record and [...].PublicActivityRecord are in unnamed module of loader 'app')
The producer sends the java object, which is a user defined type named PublicActivityRecord, like this :
KafkaProducer<String, PublicActivityRecord> producer = new KafkaProducer<>(createKafkaProperties());
[...]
this.producer.send(new ProducerRecord<String, PublicActivityRecord>(myTopic, activityRecord));
this.producer.flush();
At this point I can see in debug mode that the value of the ProducerRecord is indeed of type PublicActivityRecord.
On the registry server I can see in the log the POST request of the producer sending the schema :
Registering new schema: subject DEV-INF_9325_activityRecord_01-value, version null, id null, type null, schema size 7294 (io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource:262)
[2022-01-28 07:01:35,575] INFO 192.168.36.30 - - [28/janv./2022:06:01:34 +0000] "POST /subjects/DEV-INF_9325_activityRecord_01-value/versions HTTP/1.1" 200 8 "-" "Java/11.0.2" POSTsT (io.confluent.rest-utils.requests:62)
On the consumer side :
protected KafkaConsumer<String, PublicActivityRecord> consumer;
[...]
consumer = new KafkaConsumer<>(consumerProperties);
consumer.subscribe(Stream.of(kafkaConfig.getTopicActivityRecord()).collect(Collectors.toList()));
final ConsumerRecords<String, PublicActivityRecord> records = consumer.poll(duration);
records.forEach(record -> {
[...]
PublicActivityRecord activityRecord = record.value();
Here the ClassCastException occurs.
In debug mode, I can see that the record.value is indeed of type GenericData$Record. And it can not be cast to PublicActivityRecord.
The serializer/deserilizer keys and values are the same :
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
And in the schema-registry log, I can see the GET request of the consumer :
"GET /schemas/ids/3?fetchMaxId=false HTTP/1.1" 200 8447 "-" "Java/11.0.7" GETsT (io.confluent.rest-utils.requests:62)
So I have checked that :
- the producer sends a message with my own type
PublicActivityRecord - the message is received in the kafka broker
- the producer posts the schema to the schema registry
- the message is taken by the consumer
- the schema is GET by the consumer from the schema registry
- the value of the message is of the unexpected
GenericData$Record
This leads me to the result that what is wrong is in my consumer.
So the question is : Why do the consumer get a GenericData record instead of the expected PublicActivityRecord ?
Any clue would be much appreciated !
Solution 1:[1]
I forgot that cli commands are going through bin/console so I had to change namespace for SymfonyKernel there because it's separate environment that creates kernel.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | rafineria888wp |
