'Flink Kafka : Expecting type to be a PojoTypeInfo
My customer class is already created using maven-avro plugin.When i try to run this program i am getting error as Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - class com.example.Customer does not contain a setter for field first_name
[main] INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class com.example.Customer cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
I am using java 8
My customer class created from maven avro plugin is of specific record type
Please help me i have spent last 5 days on this still not resolved
I tried 3 different Methods and i mentioned them as Method 1, Method 2 .. below
package com.example
import com.typesafe.config.ConfigException.Generic
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecord
import org.apache.flink.api.scala.createTypeInformation
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.flink.formats.avro.AvroDeserializationSchema
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
import java.util.Properties
object flink_kafka_avro extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties
properties.setProperty("bootstrap.servers", "127.0.0.1:9092")
properties.put("group.id", "customer-consumer-group-v1")
properties.put("auto.commit.enable", "false")
properties.put("auto.offset.reset", "earliest")
import org.apache.avro.Schema
import org.apache.avro.reflect.ReflectData
val schema = ReflectData.get.getSchema(classOf[Customer])
// Method 1 not working
//val ss = new FlinkKafkaConsumer[Customer]("customer-avro", AvroDeserializationSchema.forSpecific(classOf[Customer]),properties)
val schemaRegistryUrl = "http://localhost:8081"
//Method 2
val userKafkaReaderResult = env.addSource(new FlinkKafkaConsumer[Customer]("customer-avro",
ConfluentRegistryAvroDeserializationSchema.forSpecific(classOf[Customer],schemaRegistryUrl), properties).setStartFromEarliest())
userKafkaReaderResult.print()
//Method 3
// I tried like this it is not working even
//val strenew = FlinkKafkaConsumer[Customer]("test_topic", AvroDeserializationSchema.forSpecific(classOf[Customer]), properties).setStartFromEarliest
//env.addSource(ss).print()
env.execute()
}
My POM File
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.example</groupId>
<artifactId>kafkaavrov1</artifactId>
<version>1.0-SNAPSHOT</version>
<modelVersion>4.0.0</modelVersion>
<properties>
<avro.version>1.8.2</avro.version>
<kafka.version>0.11.0.1</kafka.version>
<confluent.version>3.3.1</confluent.version>
</properties>
<!--necessary to resolve confluent dependencies-->
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro-confluent-registry -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.13.2</version>
</dependency>
<!--Only dependency needed for the avro part-->
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<!--dependencies needed for the kafka part-->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-runner</artifactId>
<version>1.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>5.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>dd
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
<stringType>String</stringType>
<createSetters>false</createSetters>
<enableDecimalLogicalType>true</enableDecimalLogicalType>
<fieldVisibility>private</fieldVisibility>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>target/generated-sources/avro</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Solution 1:[1]
Unless Customer extends org.apache.avro.specific.SpecificRecordBase, Flink won't see it as an Avro type, and will try to serialize it using its POJO serializer. And if that fails (as it does here) it will fall back to treating it as a generic type, and will use Kryo.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | David Anderson |
