'KafkaJsonSerializer: Default constructor for case class

I am consuming data from a Kafka topic using the following consumer setup:

val consumer = {
    val properties = new Properties()
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "some_consumer_group")
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, streamingConfig.getString("kafka.brokers"))
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaJsonDeserializer[Sample]])
    properties.put("json.value.type",classOf[Sample])
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, streamingConfig.getInt("kafka.maxPollRecords"): java.lang.Integer)
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, streamingConfig.getInt("kafka.sessionTimeoutMs"): java.lang.Integer)
    properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, streamingConfig.getInt("kafka.heartbeatIntervalMs"): java.lang.Integer)
    val consumer = new KafkaConsumer[String, Sample](properties)
    consumer.subscribe(util.Arrays.asList(topic))
    consumer
}

while the payload is represented by the following case class

case class Sample(properties: Map[String, String],eventType: String)

The problem is that during the deserialization the following errors are produced

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.service.Sample` (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: (byte[])"{}"; line: 1, column: 2]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.service.Sample` (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
 at [Source: (byte[])"{}"; line: 1, column: 2]

If I was using Java the solution for this is straighforward: Just add a default constructor. What happens in the case of Scala case classes though? I know that the Jackson object mappers can be configured with a jackson-module-scala which takes care of case classes but due to the way the KafkaJsonSerializer is configured I cannot mess with its object mappers - this is its configure method:

protected void configure(KafkaJsonSerializerConfig config) {
    boolean prettyPrint = config.getBoolean("json.indent.output");
    this.objectMapper = new ObjectMapper();
    this.objectMapper.configure(SerializationFeature.INDENT_OUTPUT, prettyPrint);
}

So how can I go about this error here?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source