'KafkaException: Could not instantiate class JsonDeserializer
I'm trying to start Spring Boot application with Kafka client following this reference guide and I'm getting the error below.
Could you please advise how to fix?
@Bean
public Map<String, Object> consumerConfig() {
final HashMap<String, Object> result = new HashMap<>();
result.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap.servers"));
result.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
result.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return result;
}
@Bean
public ConsumerFactory<Long, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
ConcurrentKafkaListenerContainerFactory<Long, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Long, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();
containerFactory.setConsumerFactory(consumerFactory());
containerFactory.setConcurrency(3);
containerFactory.getContainerProperties().setPollTimeout(3000);
return containerFactory;
}
--
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:176) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:51) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:346) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:149) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:112) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:545) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:737) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:370) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1162) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1151) [spring-boot-1.5.1.RELEASE.jar:1.5.1.RELEASE]
at com.ubs.wma.bmss.BmssConsumerApp.main(BmssConsumerApp.java:12) [classes/:na]
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:703) ~[kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:553) ~[kafka-clients-0.10.1.1.jar:na]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:73) ~[spring-kafka-1.1.2.RELEASE.jar:na]
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:69) ~[spring-kafka-1.1.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:305) ~[spring-kafka-1.1.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:230) ~[spring-kafka-1.1.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:180) ~[spring-kafka-1.1.2.RELEASE.jar:na]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) ~[spring-kafka-1.1.2.RELEASE.jar:na]
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:124) ~[spring-kafka-1.1.2.RELEASE.jar:na]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:202) ~[spring-kafka-1.1.2.RELEASE.jar:na]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:287) ~[spring-kafka-1.1.2.RELEASE.jar:na]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:236) ~[spring-kafka-1.1.2.RELEASE.jar:na]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:173) ~[spring-context-4.3.6.RELEASE.jar:4.3.6.RELEASE]
... 12 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Could not instantiate class org.springframework.kafka.support.serializer.JsonDeserializer
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:316) ~[kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:203) ~[kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632) ~[kafka-clients-0.10.1.1.jar:na]
... 24 common frames omitted
Caused by: java.lang.IllegalAccessException: Class org.apache.kafka.common.utils.Utils can not access a member of class org.springframework.kafka.support.serializer.JsonDeserializer with modifiers "protected"
at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) ~[na:1.8.0_121]
at java.lang.Class.newInstance(Class.java:436) ~[na:1.8.0_121]
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:314) ~[kafka-clients-0.10.1.1.jar:na]
... 26 common frames omitted
Solution 1:[1]
According to that documentation we have:
for more complex or particular cases, the
KafkaConsumer, and thereforeKafkaProducer, provides overloaded constructors to accept(De)Serializerinstances for keys and/or values, respectively.To meet this API, the
DefaultKafkaProducerFactoryandDefaultKafkaConsumerFactoryalso provide properties to allow to inject a custom(De)Serializerto targetProducer/Consumer.
And further Apache Kafka JavaDocs:
/**
* A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value {@link Serializer}.
* Valid configuration strings are documented <a href="http://kafka.apache.org/documentation.html#producerconfigs">here</a>.
* Values can be either strings or Objects of the appropriate type (for example a numeric configuration would accept
* either the string "42" or the integer 42).
* @param configs The producer configs
* @param keySerializer The serializer for key that implements {@link Serializer}. The configure() method won't be
* called in the producer when the serializer is passed in directly.
* @param valueSerializer The serializer for value that implements {@link Serializer}. The configure() method won't
* be called in the producer when the serializer is passed in directly.
*/
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
So, what you need is like this:
@Bean
public ConsumerFactory<Long, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig(), null, new JsonDeserializer(Foo.class));
}
The problem that JsonDeserializer can't be instantiated by the reflection because it needs particular type to deserialize to.
Solution 2:[2]
firts delete this configuration in your properties
@Bean
public Map<String, Object> consumerConfig() {
final HashMap<String, Object> result = new HashMap<>();
result.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap.servers"));
//result.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
//result.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return result;
}
now modifie this configuration to
@Bean
public ConsumerFactory<Long, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig(), new LongDeserializer(), new JsonDeserializer(YourClass.class));
}
JsonDesrializer And LongDeserializer are respectivly imported from
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
Solution 3:[3]
You can try add to faster.xml dependency to the pom.xml. It solves in my case.
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Community |
| Solution 2 | Mukendi Emmanuel |
| Solution 3 | Okan |
