'How to set custom Partitioner for kafka in Flink 1.14.3?

    val mySink = KafkaSink.builder()
  .setBootstrapServers(bootstrapserver)
  .setKafkaProducerConfig(myproperties)
  .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
      .setRecordSerializer(
        new mycustomSerializer(mytopic,
                schemaregistryurl)

      ).build()

class mycustomSerializer(topic: String, schemaRegistryUrl: String) extends 
KafkaRecordSerializationSchema[MyRecord]

Although the above code is working fine but i am unable to set the custom partitioner in above code

I have gone through the documentation if we make mycustomSerializer using KafkaRecorderSerializationSchema.builder() then we can set easily but how to do that in above code ?



Solution 1:[1]

The serializer shouldn't set the partitioner.

Inside of myproperties map, you should be able to set ProducerConfig.PARTITIONER_CLASS_CONFIG key with the value of your custom class.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 OneCricketeer