'How to set custom Partitioner for kafka in Flink 1.14.3?
val mySink = KafkaSink.builder()
.setBootstrapServers(bootstrapserver)
.setKafkaProducerConfig(myproperties)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setRecordSerializer(
new mycustomSerializer(mytopic,
schemaregistryurl)
).build()
class mycustomSerializer(topic: String, schemaRegistryUrl: String) extends
KafkaRecordSerializationSchema[MyRecord]
Although the above code is working fine but i am unable to set the custom partitioner in above code
I have gone through the documentation if we make mycustomSerializer using KafkaRecorderSerializationSchema.builder() then we can set easily but how to do that in above code ?
Solution 1:[1]
The serializer shouldn't set the partitioner.
Inside of myproperties map, you should be able to set ProducerConfig.PARTITIONER_CLASS_CONFIG key with the value of your custom class.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | OneCricketeer |
