'org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema cannot be converted to ObjectNode
When I used the following code:
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type",trustStoreType)
.setProperty("ssl.truststore.password",trustStorePassword)
.setProperty("ssl.truststore.location",trustStoreLocation)
.setProperty("security.protocol",securityProtocol)
.setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
.setDeserializer(new JSONKeyValueDeserializationSchema(false))
.setStartingOffsets
(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();
I get the following error during build
incompatible types: org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema cannot be converted to org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>
.setDeserializer(new JSONKeyValueDeserializationSchema(false))
Someone a clue as to what is wrong?
Solution 1:[1]
The solution is:
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type",trustStoreType)
.setProperty("ssl.truststore.password",trustStorePassword)
.setProperty("ssl.truststore.location",trustStoreLocation)
.setProperty("security.protocol",securityProtocol)
.setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata)))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | user1393650 |
