'How to Deseralize Avro response getting from Datastream Scala + apache Flink

I am Getting Avro Response from a Kafka Topic from Confluent and i am facing issues when i want to deseralize the response. Not Understanding the Syntax How i should define the Avro deserializer and use in my Kafka Source while reading.

Sharing the approach i am currently doing.

I have a topic In Confluent named employee which is producing message every 10 seconds and each message is seralized by avro schema registry in the Confluent.

I am trying to Read those messages in my scala program I was able to print the serialised messages in the code but not able to deserialize the messaged.

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.formats.avro.AvroDeserializationSchema
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord

import java.time.Duration


case class emp(
                   name: String,
                   age: Int,
              )

object Main {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val schemaRegistryUrl = "http://localhost:8081"
    val source = KafkaSource.builder[String].
      setBootstrapServers("localhost:9092")
      .setTopics("employee")
      .setGroupId("my-group")
      .setStartingOffsets(OffsetsInitializer.earliest)
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .build


    val streamEnv : DataStream[String] =
      env.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)), "Kafka Source")
    
    

    streamEnv.print()
    env.execute("Example")


  }

}

I tried the Approach of Defining the Avro deserializer in kafka source while reading

.setValueOnlyDeserializer(new AvroDeserializationSchema[emp](classOf[emp])

Had no luck in the above approach as well.



Solution 1:[1]

Rather than a AvroDeserializationSchema, you need to use a ConfluentRegistryAvroDeserializationSchema instead. The standard Avro deserializer doesn't understand what to do with the magic byte that the Confluent serializer includes.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 David Anderson