'HttpMessageConverter - AVRO to JSON to AVRO

I'm looking for an easy to use solution allowing to send all sorts of AVRO objects I read from a kafka stream to synchroneous recipients via REST. This can be single objects as well as collections or arrays of the same object types. The same for a binary format (between instances e.g. for interactive query feature where the record would be on a nother node). And finally I need to support compression.

There are several sources discussing Spring solutions for HttpMessageConverter simplifying the handling of domain objects in micro services using Kafka and AVRO e.g.:

Above proposed solutions work perfectly fine for scenarios where one instance of an AVRO object needs to be sent back or received. What is missing however, would be a solution allowing to send/receive collections or arrays of same AVRO objects.

For serializing single AVRO objects the code may look like

public byte[] serialize(T data) throws SerializationException {
    try {
      byte[] result = null;

      if (data != null) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().jsonEncoder(data.getSchema(), byteArrayOutputStream);
        DatumWriter<T> datumWriter = new SpecificDatumWriter<>(data.getSchema());
        datumWriter.write(data, encoder);
        encoder.flush();
        byteArrayOutputStream.close();
        result = byteArrayOutputStream.toByteArray();
      }
      return result;
    } catch (IOException e) {
      throw new SerializationException("Can't serialize data='" + data + "'", e);
    }
}

Similarly, the deserialization

public T deserialize(Class<? extends T> clazz, byte[] data) throws SerializationException {
    try {
      T result = null;
      if (data != null) {
        Class<? extends SpecificRecordBase> specificRecordClass =
            (Class<? extends SpecificRecordBase>) clazz;
        Schema schema = specificRecordClass.newInstance().getSchema();
        DatumReader<T> datumReader =
            new SpecificDatumReader<>(schema);
        Decoder decoder = DecoderFactory.get().jsonDecoder(schema, new ByteArrayInputStream(data));
        result = datumReader.read(null, decoder);
      }
      return result;
    } catch (InstantiationException | IllegalAccessException | IOException e) {
      throw new SerializationException("Can't deserialize data '" + Arrays.toString(data) + "'", e);
    }
}

Examples focusing on JSON but same principle also applies for a binary format. What is missing however would be a solution allowing to send/receive collections or arrays of same AVRO objects. I therefore introduced two methods:

public byte[] serialize(final Iterator<T> iterator) throws SerializationException {
    Encoder encoder = null;
    DatumWriter<T> datumWriter = null;

    try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
        while (iterator.hasNext()) {
            T data = iterator.next();
            if (encoder == null) {
                // now that we have our first object we can get the schema 
                encoder = EncoderFactory.get().jsonEncoder(data.getSchema(), byteArrayOutputStream);
                datumWriter = new SpecificDatumWriter<>(data.getSchema());
                byteArrayOutputStream.write('[');
            }
            datumWriter.write(data, encoder);
            if (iterator.hasNext()) {
                encoder.flush();
                byteArrayOutputStream.write(',');
            }
        }
        if (encoder != null) {
            encoder.flush();
            byteArrayOutputStream.write(']');
            return byteArrayOutputStream.toByteArray();
        } else {
            return null;
        }
    } catch (IOException e) {
        throw new SerializationException("Can't serialize the data = '" + iterator + "'", e);
    }
}

Deserialization gets even more a bit of hack:

public Collection<T> deserializeCollection(final Class<? extends T> clazz, final byte[] data) throws SerializationException {
    try {
        if (data != null) {
            final Schema schema = clazz.getDeclaredConstructor().newInstance().getSchema();
            final SpecificDatumReader<T> datumReader = new SpecificDatumReader<>(schema);
            final ArrayList<T> resultList = new ArrayList<>();
            int i = 0;
            int startRecord = 0;
            int openCount = 0;
            ParserStatus parserStatus = ParserStatus.NA;
            while (i < data.length) {
                if (parserStatus == ParserStatus.NA) {
                    if (data[i] == '[') {
                        parserStatus = ParserStatus.ARRAY;
                    }
                } else if (parserStatus == ParserStatus.ARRAY) {
                    if (data[i] == '{') {
                        parserStatus = ParserStatus.RECORD;
                        openCount = 1;
                        startRecord = i;
                        // } else if (data[i] == ',') {
                        // ignore
                    } else if (data[i] == ']') {
                        parserStatus = ParserStatus.NA;
                    }
                } else {  // parserStatus == ParserStatus.RECORD
                    if (data[i] == '}') {
                        openCount--;
                        if (openCount == 0) {
                            // now carve out the part start - i+1 and use a datumReader to create avro object
                            try (final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data, startRecord, i - startRecord +1)) {
                                final Decoder decoder = DecoderFactory.get().jsonDecoder(schema, byteArrayInputStream);
                                final SpecificRecordBase avroRecord = datumReader.read(null, decoder);
                                resultList.add((T) avroRecord);
                            }
                            parserStatus = ParserStatus.ARRAY;
                        }
                    } else if (data[i] == '{') {
                        openCount++;
                    }
                }
                i++;
            }
            if (parserStatus != ParserStatus.NA) {
                log.warn("Malformed json input '{}'", new String(data));
            }
            return resultList;
        }
        return null;
    } catch (InstantiationException | InvocationTargetException | IllegalAccessException | NoSuchMethodException | IOException e) {
        throw new SerializationException("Can't deserialize data '" + new String(data) + "'", e);
    }
}

Doing the same with format binary is far more straight forward as for the serialization one record after another can be serialized using datumWriter.write(data, encoder) with encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null). There is no additional syntax needed and similarly deserialize SpecificRecordBase avroRecord = datumReader.read(null, decoder) and then adding the avroRecord to the collection.

For JSON the additional syntax is needed as the recipient might use its own deserialization e.g. for creating plain POJ objects or the other way round might have created the input using a POJ object and serialized it.

My current solution looks to me quite hacky. One option I thought of would be to create an intermediate AVRO object which only contains an array of the enclosed object and then I could use this intermediate object for serialization and deserialization as then the out of the box encoder and decoder would take over the extra logic. But introducing an extra AVRO object only for this purpose seems unnecessary overhead. As an alternative solution I’ve started looking into org.apache.avro.io. JsonDecoder but didn’t see an easy way to extend it in a way to abstract from above home grown solution.

I'm currently extending above to support also compression using a decorator for compression and decompression (Deflator, GZIP and LZ4).

Any help or better solution is appreciated.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source