'org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema cannot be converted to ObjectNode

When I used the following code:

    KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
            .setProperties(kafkaProps)
            .setProperty("ssl.truststore.type",trustStoreType)
            .setProperty("ssl.truststore.password",trustStorePassword)
            .setProperty("ssl.truststore.location",trustStoreLocation)
            .setProperty("security.protocol",securityProtocol)
            .setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs)
            .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
            .setGroupId(groupId)
            .setTopics(kafkaInputTopic)
            .setDeserializer(new JSONKeyValueDeserializationSchema(false))
            .setStartingOffsets
             (OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
            .build();

I get the following error during build incompatible types: org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema cannot be converted to org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>

                .setDeserializer(new JSONKeyValueDeserializationSchema(false))

Someone a clue as to what is wrong?



Solution 1:[1]

The solution is:

    KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
            .setProperties(kafkaProps)
            .setProperty("ssl.truststore.type",trustStoreType)
            .setProperty("ssl.truststore.password",trustStorePassword)
            .setProperty("ssl.truststore.location",trustStoreLocation)
            .setProperty("security.protocol",securityProtocol)
            .setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs)
            .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
            .setGroupId(groupId)
            .setTopics(kafkaInputTopic)
            .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata)))
            .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
            .build();

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 user1393650