'how to send a byte array in kafka with Avro Schema registry
I'm trying to send bytes with a JSON object using a Java POJO and an Avro schema, with schema registry. So my question is, how should my code send "bytes" type data to JSON?
My Producer Kafka Config:
@Bean
public ProducerFactory<String, ?> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TEST_KAFKA_ENDPOINT);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, TEST_KAFKA_SCHEMA_REGISTRY_URL);
return new DefaultKafkaProducerFactory<>(props);
}
And my Consumer Config:
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, TEST_KAFKA_ENDPOINT);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
props.put(SCHEMA_REGISTRY_URL_CONFIG, TEST_KAFKA_SCHEMA_REGISTRY_URL);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true"); //Deserialize GeneralRecord to SpecificRecord
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props);
}
- Producer
First, to send a byte using Kafka producer, I made a POJO generated by an Avro schema file, by using a Gradle plugin.
gradle task :generateAvroJava
- Avro Schema
My Avro schema file is:
{
"type": "record",
"name": "SomeEvent",
"namespace": "com.mypacakge",
"doc": "This event records the save of a NoSQL Data",
"fields": [
{
"name": "someBytesData",
"type": "bytes"
},
{
"name": "resourceStr",
"type": {
"type": "string",
"avro.java.string": "String"
}
},
{
"name": "cTime",
"type": "long"
}
]
}
So EventDto(SomeEvent)
automatically makes the bytes field ByteBuffer
.
SomeEvent
is a DTO. I initialized it like this:
- Initialize data and send to it to the Kafka schema registry
byte[] bytesData = getSomeByte();
ByteBuffer wrap = ByteBuffer.wrap(bytesData);
SomeEvent pojoEvent = new SomeEvent(wrap, resourceStr, cTime);
saveTemplate.send(MY_TOPIC, "testKey", EventDto);
I print out the buffer capacity and limit and it was like this:
capacity = 72000
limit = 72000
Then I checked the akhq
topic UI and it was something like this:
- Schema registry
Key : "testKey"
Value :
{
"resourceStr": "testResKey",
"cTime": 1234,
"someBytesData": "AAAAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQ... "
}
It automatically converts to String
, and I can't receive full byte data by doing this.
- Consumer
@KafkaListener(topics = MY_TOPIC)
public void myConsumer(SomeEvent someEvent) {
String resourceStr = someEvent.getResourceStr();
long cTime = someEvent.getCTime();
ByteBuffer buf = someEvent.getSomeBytesData();
int limit = buf.limit();
int capacity = buf.capacity();
System.out.println("after capacity = " + capacity);
System.out.println("after limit = " + limit);
usingBytesDataMethod(buf.array());
}
I think data are changed or the way I convert it from string to byte buffer is wrong.
Or, a problem with deserializing may have occurred in another place.
Its buffer capacity and limit is changed like this:
after capacity = 170668
after limit = 170668
So my question is:
Am I doing it right? Is there any way to send bytes to the Kafka registry using with Avro schema?
What is right way to serialize and deserialize bytes using an Avro Schema and a Kafka Schema-registry environment using Java?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|