'Apache Kafka Avro Deserialization: Unable to deserialize or decode Specific type message.
I am trying to use Avro Serialize with Apache kafka for serialize/deserialize messages. I am create one producer, which is used to serialize specific type message and send it to the queue. When message is send successfully to the queue, our consumer pick the message and trying to process, but while trying we are facing an exception, for case bytes to specific object. The exception is as below:
[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.avroserializer.Customer
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.lambda$infiniteConsumer$0(AvroSpecificDeserializer.java:51)
at java.lang.Iterable.forEach(Iterable.java:75)
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:46)
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:63)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
According to exception, we are using some inconenient way for read the data, below is our code:
Kafka Producer Code:
static {
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put("schema.registry.url", "http://localhost:8081");
kafkaProducer = new KafkaProducer<>(kafkaProps);
}
public static void main(String[] args) throws InterruptedException, IOException {
Customer customer1 = new Customer(1002, "Jimmy");
Parser parser = new Parser();
Schema schema = parser.parse(AvroSpecificProducer.class
.getClassLoader().getResourceAsStream("avro/customer.avsc"));
SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
try(ByteArrayOutputStream os = new ByteArrayOutputStream()) {
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
writer.write(customer1, encoder);
encoder.flush();
byte[] avroBytes = os.toByteArray();
ProducerRecord<String, byte[]> record1 = new ProducerRecord<>("CustomerSpecificCountry",
"Customer One 11 ", avroBytes
);
asyncSend(record1);
}
Thread.sleep(10000);
}
Kafka Consumer Code:
static {
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
kafkaProps.put("schema.registry.url", "http://localhost:8081");
}
public static void infiniteConsumer() throws IOException {
try(KafkaConsumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProps)) {
kafkaConsumer.subscribe(Arrays.asList("CustomerSpecificCountry"));
while(true) {
ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(100);
System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" + records.count());
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(AvroSpecificDeserializer.class
.getClassLoader().getResourceAsStream("avro/customer.avsc"));
records.forEach(record -> {
DatumReader<Customer> customerDatumReader = new SpecificDatumReader<>(schema);
BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
try {
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
Customer customer = customerDatumReader.read(null, binaryDecoder);
System.out.println(customer);
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
}
Using consumer in console, we are successfully able to receive the message. So what is the way for decode message into our pojo files ?
Solution 1:[1]
The solution of this problem is, use
DatumReader<GenericRecord> customerDatumReader = new SpecificDatumReader<>(schema);
instead of
`DatumReader<Customer> customerDatumReader = new SpecificDatumReader<>(schema);
The exact reason for this, still not found. This may be, because Kafka, doesn't know about the structure of message, we explicitly define schema for message, and GenericRecord is useful to convert any message into readable JSON format according to schema. After creating JSON, we can easily convert it into our POJO class.
But Still, need to find solution for convert directly into our POJO class.
Solution 2:[2]
You don't need to do the Avro serialization explicitly before passing the values to ProduceRecord. The serializer will do it for you. Your code would look like:
Customer customer1 = new Customer(1002, "Jimmy");
ProducerRecord<String, Customer> record1 = new ProducerRecord<>("CustomerSpecificCountry", customer1);
asyncSend(record1);
}
See an example from Confluent for a simple producer using avro
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Harmeet Singh Taara |
| Solution 2 | Javier Holguera |
