'how to send a byte array in kafka with Avro Schema registry

I'm trying to send bytes with a JSON object using a Java POJO and an Avro schema, with schema registry. So my question is, how should my code send "bytes" type data to JSON?

My Producer Kafka Config:

    @Bean
    public ProducerFactory<String, ?> producerFactory() {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TEST_KAFKA_ENDPOINT);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, TEST_KAFKA_SCHEMA_REGISTRY_URL);

        return new DefaultKafkaProducerFactory<>(props);
    }

And my Consumer Config:

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, TEST_KAFKA_ENDPOINT);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);
        props.put(SCHEMA_REGISTRY_URL_CONFIG, TEST_KAFKA_SCHEMA_REGISTRY_URL);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");  //Deserialize GeneralRecord to SpecificRecord
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        return new DefaultKafkaConsumerFactory<>(props);
    }
  1. Producer

First, to send a byte using Kafka producer, I made a POJO generated by an Avro schema file, by using a Gradle plugin.

gradle task :generateAvroJava

  1. Avro Schema

My Avro schema file is:

{
  "type": "record",
  "name": "SomeEvent",
  "namespace": "com.mypacakge",
  "doc": "This event records the save of a NoSQL Data",
  "fields": [
    {
      "name": "someBytesData",
      "type": "bytes"
    },
    {
      "name": "resourceStr",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      }
    },
    {
      "name": "cTime",
      "type": "long"
    }
  ]
}

So EventDto(SomeEvent) automatically makes the bytes field ByteBuffer.

SomeEvent is a DTO. I initialized it like this:

  1. Initialize data and send to it to the Kafka schema registry
byte[] bytesData = getSomeByte();
ByteBuffer wrap = ByteBuffer.wrap(bytesData);
SomeEvent pojoEvent = new SomeEvent(wrap, resourceStr, cTime);
saveTemplate.send(MY_TOPIC, "testKey", EventDto);

I print out the buffer capacity and limit and it was like this:

capacity = 72000
limit = 72000

Then I checked the akhq topic UI and it was something like this:

  1. Schema registry
Key : "testKey"

Value : 
{
    "resourceStr": "testResKey",
    "cTime": 1234,
    "someBytesData": "AAAAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQABAAEAAQ... "
}

It automatically converts to String, and I can't receive full byte data by doing this.

  1. Consumer
   @KafkaListener(topics = MY_TOPIC)
   public void myConsumer(SomeEvent someEvent) {

        String resourceStr = someEvent.getResourceStr();
        long cTime = someEvent.getCTime();

        ByteBuffer buf = someEvent.getSomeBytesData();
        int limit = buf.limit();
        int capacity = buf.capacity();

        System.out.println("after capacity = " + capacity);
        System.out.println("after limit = " + limit);

        usingBytesDataMethod(buf.array());

    }

I think data are changed or the way I convert it from string to byte buffer is wrong.

Or, a problem with deserializing may have occurred in another place.

Its buffer capacity and limit is changed like this:

after capacity = 170668
after limit = 170668

So my question is:

Am I doing it right? Is there any way to send bytes to the Kafka registry using with Avro schema?

What is right way to serialize and deserialize bytes using an Avro Schema and a Kafka Schema-registry environment using Java?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source