'REST WSO2 Kafka with AVRO schema

api.xml:

<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishweatherdata" name="WeatherDataPublishAPI" xmlns="http://ws.apache.org/ns/synapse">
    <resource methods="POST">
        <inSequence>
            <property expression="json-eval($.test)" name="valueSchema" scope="default" type="STRING"/>
            <property expression="json-eval($.value)" name="value" scope="default" type="STRING"/>
            <property expression="json-eval($.key)" name="key" scope="default" type="STRING"/>
            <property expression="json-eval($.topic)" name="topic" scope="default" type="STRING"/>
            <kafkaTransport.init>
                <bootstrapServers>localhost:9092</bootstrapServers>
                <keySerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</keySerializerClass>
                <valueSerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</valueSerializerClass>
                <schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
                <acks>all</acks>
                <requestTimeout>10000</requestTimeout>
                <timeout>8000</timeout>
                <metadataFetchTimeout>5000</metadataFetchTimeout>
                <maxPoolSize>50</maxPoolSize>
            </kafkaTransport.init>
            <kafkaTransport.publishMessages>
                <topic>{$ctx:topic}</topic>
            </kafkaTransport.publishMessages>
            <payloadFactory media-type="json">
                <format>
                    {"topic":"$1", "partition":"$2", "offset":"$3", "schema":"$4"}
                </format>
                <args>
                    <arg evaluator="xml" expression="$ctx:topic"/>
                    <arg evaluator="xml" expression="$ctx:partition"/>
                    <arg evaluator="xml" expression="$ctx:offset"/>
                    <arg evaluator="xml" expression="$ctx:valueSchema"/>
                </args>
            </payloadFactory>
            <property name="messageType" scope="axis2" type="STRING" value="application/json"/>
            <respond/>
        </inSequence>
        <outSequence/>
        <faultSequence/>
    </resource>
</api>

if I use org.apache.kafka.common.serialization.StringSerializer working fine

if I use io.confluent.kafka.serializers.KafkaAvroSerializer I have:

[2022-02-22 12:01:37,547] INFO {KafkaConnectionPool} - Connection Pool is NOT full. Proceeding with adding new connections

also noticed the line <schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl> disappears.

use WSO2 Integration Studio 8.0.2

also added to IntegrationStudio/runtime/microesb/lib directory:

kafka-schema-registry-client-5.3.0.jar 
metrics-core-2.2.0.jar    
scala-library-2.12.3. 
jarzookeeper-3.4.10.jar 
zkclient-0.10.jar    
common-config-5.4.0.jar 
zookeeper-3.4.10.jar
kafka_2.12-1.0.0.jar
avro-1.8.1.jar
common-utils-5.4.0.jar
kafka-avro-serializer-5.3.0.jar
kafka-clients-1.0.0.jar

Why doesn't it work with AVRO?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source