'How to transform a log4j message to fit an avro schema and post to kafka

I am working on a system that sends all logs for all microservices to a single topic apache kafka. Most services are in python but we are now forwarding logs from a Streams app. All other services use the same Schema defined in avro and managed by confluent's Schema Registry. I can get data posting to kafka fine as a string but cannot figure out how to upload a valid avro object linked to a schema registry schema. I am currently attempting to do this via a custom log4j plugin. For testing purposes I am writing these logs to their own topic and reading them out using kcat -b localhost:9092 -s value=avro -r localhost:8081 -t new_logs -f 'key: %k value: %s Partition: %p\n\n', but I get

ERROR: Failed to format message in new_logs [0] at offset 0: Avro/Schema-registry message deserialization: Invalid CP1 magic byte 115, expected 0: message not produced with Schema-Registry Avro framing: terminating

when doing this (that kcat command does work for my actual service logs topic and all other topics that use valid avro). Originally I tried using the org.apache.avro.generic.GenericData.Record class but could not figure out how to make it work in the methods toSerializable and toByteArray required by the AbstractLayout Interface since that class does not implement the serializable class. Below is the Plugin, class definition, log4j config

ServiceLogLayout.java

@Plugin(name="ServiceLogLayout", category= Node.CATEGORY, elementType = Layout.ELEMENT_TYPE, printObject = true)
public class ServiceLogLayout extends AbstractLayout<byte[]> {
    Schema record;
    DatumWriter<GenericData.Record> serviceLogDatumWriter;

    public ServiceLogLayout() {
        // maybe set these for avro
        super(null, null, null);
        Schema timestampMilliType = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));

        // CREATE SCHEMA
        Schema.Field service = new Schema.Field("service", SchemaBuilder.builder().stringType(), "Name of service sending this message");
        Schema.Field environment = new Schema.Field("environment", SchemaBuilder.builder().enumeration("environment_type").symbols("local", "dev", "staging", "prod", "shared_services", "testing", "security"));
        Schema.Field level = new Schema.Field("level", SchemaBuilder.builder().enumeration("level_type").symbols("debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"), "logging level");
        Schema.Field msg = new Schema.Field("msg", SchemaBuilder.builder().stringType(), "Required log message");

        List<Schema.Field> fields = new ArrayList<>();
        fields.add(service);
        fields.add(environment);
        fields.add(level);
        fields.add(msg);
        this.record = Schema.createRecord("service_logs", "", "com.test.avro", false, fields);
        this.serviceLogDatumWriter = new GenericDatumWriter<>(this.record);
    }

    @Override
    public byte[] toByteArray(LogEvent event) {
        LOGGER.warn("toByteArray");

        String env = System.getenv("ENVIRONMENT") != null ? System.getenv("ENVIRONMENT").toLowerCase() : "local";
        // FILL IN RECORD
        GenericRecordBuilder schemaBuilder = new GenericRecordBuilder(this.record);
        schemaBuilder.set("service", "testService");
        schemaBuilder.set("environment", new GenericData.EnumSymbol(this.record.getField("environment").schema(), env));
        schemaBuilder.set("level", new GenericData.EnumSymbol(this.record.getField("level").schema(), event.getLevel().name().toLowerCase()));
        schemaBuilder.set("msg", event.getMessage().getFormattedMessage());
        
        // SERIALIZE
        byte[] data = new byte[0];
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Encoder jsonEncoder = null;
        try {
            jsonEncoder = EncoderFactory.get().jsonEncoder(
                    this.record, stream);
            this.serviceLogDatumWriter.write(schemaBuilder.build(), jsonEncoder);
            jsonEncoder.flush();
            data = stream.toByteArray();
        } catch (IOException e) {
            LOGGER.error("Serialization error:" + e.getMessage());
        }
        return data;
    }

    @Override
    public byte[] toSerializable(LogEvent event) {
        return toByteArray(event);
    }

    @Override
    public String getContentType() {
        return null;
    }

    @PluginFactory
    public static Layout<?> createLayout() {
        return new ServiceLogLayout();
    }

    private static class PrivateObjectOutputStream extends ObjectOutputStream {

        public PrivateObjectOutputStream(final OutputStream os) throws IOException {
            super(os);
        }

        @Override
        protected void writeStreamHeader() {
            // do nothing
        }
    }

}

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" packages="logging.log4j.custom.plugins">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <ServiceLogLayout />

        </Console>
        <Kafka name="Kafka" topic="new_logs">
            <ServiceLogLayout />
            <Property name="bootstrap.servers">${env:BOOTSTRAP_SERVERS}</Property>
        </Kafka>
    </Appenders>
    <Loggers>
        <Root level="INFO">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="Kafka"/>
        </Root>
        <Logger name="org.apache.kafka" level="WARN"/>
    </Loggers>
</Configuration>


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source