'Spring Kafka The class is not in the trusted packages
In my Spring Boot/Kafka application before the library update, I used the following class org.telegram.telegrambots.api.objects.Update in order to post messages to the Kafka topic. Right now I use the following org.telegram.telegrambots.meta.api.objects.Update. As you may see - they have different packages.
After application restart I ran into the following issue:
[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition telegram.fenix.bot.update-0 at offset 4223. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'org.telegram.telegrambots.api.objects.Update' is not in the trusted packages: [java.util, java.lang, org.telegram.telegrambots.meta.api.objects]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:221) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:967) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:93) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1144) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:993) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:527) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:488) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1155) ~[kafka-clients-1.1.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) ~[kafka-clients-1.1.0.jar!/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.8.RELEASE.jar!/:2.1.8.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_171]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_171]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]
This is my config:
@EnableAsync
@Configuration
public class ApplicationConfig {
@Bean
public StringJsonMessageConverter jsonConverter() {
return new StringJsonMessageConverter();
}
}
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);
return props;
}
@Bean
public ProducerFactory<String, Update> updateProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Update> updateKafkaTemplate() {
return new KafkaTemplate<>(updateProducerFactory());
}
}
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.consumer.max.poll.interval.ms}")
private String kafkaConsumerMaxPollIntervalMs;
@Value("${kafka.consumer.max.poll.records}")
private String kafkaConsumerMaxPollRecords;
@Value("${kafka.topic.telegram.fenix.bot.update.consumer.concurrency}")
private Integer updateConsumerConcurrency;
@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(consumerFactory(kafkaProperties));
return factory;
}
@Bean
public ConsumerFactory<String, Update> updateConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Update.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Update> updateKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
ConcurrentKafkaListenerContainerFactory<String, Update> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(updateConsumerFactory(kafkaProperties));
factory.setConcurrency(updateConsumerConcurrency);
return factory;
}
}
application.properties
spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
How to solve this issue and let Kafka deserialize old messages into the new ones ?
UPDATED
This is my listener
@Component
public class UpdateConsumer {
@KafkaListener(topics = "${kafka.topic.update}", containerFactory = "updateKafkaListenerContainerFactory")
public void onUpdateReceived(ConsumerRecord<String, Update> consumerRecord, Acknowledgment ack) {
//do some logic here
ack.acknowledge();
}
}
Solution 1:[1]
There are two key points should be mentioned.
- There are two separated project for Producer and Consumer.
- Then sending message(value) is an Object type rather primitive type.
The problem is that the producing message object is not available in consumer side because those are two separate projects.
Two overcome this issue please follow below mention steps in Spring boot Producer and Consumer applications.
----Producer App -------------
** Producer Configuration Class **
import com.kafka.producer.models.Container;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Container> producerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory(config);
}
@Bean
public KafkaTemplate<String, Container> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
Note : Container is the custom Object to be posted in a kafka topic.
** Producer Class **
import com.kafka.producer.models.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class Producer {
private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = "final-topic";
@Autowired
private KafkaTemplate<String, Container> kafkaTemplate;
public void sendUserMessage(Container msg) {
LOGGER.info(String.format("\n ===== Producing message in JSON ===== \n"+msg));
Message<Container> message = MessageBuilder
.withPayload(msg)
.setHeader(KafkaHeaders.TOPIC, TOPIC)
.build();
this.kafkaTemplate.send(message);
}
}
** Producer Controller **
import com.kafka.producer.models.Container;
import com.kafka.producer.services.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/message")
public class MessageController {
@Autowired
private Producer producer;
@PostMapping(value = "/publish")
public String sendMessageToKafkaTopic(@RequestBody Container containerMsg) {
this.producer.sendUserMessage(containerMsg);
return "Successfully Published !!";
}
}
Note: The message with type Container will be published to the kafka topic name :final-topic as JSON message.
===============================================================================
-- Consumer App --
** Configuration Class **
import com.kafka.consumer.models.Container;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerOneConfig {
@Bean
public ConsumerFactory<String, Container> consumerFactory(){
JsonDeserializer<Container> deserializer = new JsonDeserializer<>(Container.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_one");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Container> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, Container> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
Note: Here you can see, instead of using default JsonDeserializer() we have to use custom JsonDeserializer to consume Container object type Json Messages from final-topic(topic name).
** Consumer Service **
import com.kafka.consumer.models.Container;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
public class ConsumerOne {
private final Logger LOGGER = LoggerFactory.getLogger(ConsumerOne.class);
@KafkaListener(topics = "final-topic", groupId = "group_one", containerFactory = "kafkaListenerContainerFactory")
public void consumeUserMessage(@Payload Container msg, @Headers MessageHeaders headers) throws IOException {
System.out.println("received data in Consumer One ="+ msg.getMessageTypes());
}
}
Solution 2:[2]
For this one there are two ways of doing it, either in your deserializer or in your application.yml.
Either in your deserializer
In your deserializer, that you use within your DefaultKafkaConsumerFactory (to create your consumer factory).
Let's say you want to make a ConsumerFactory<String, Foo> with Foo being the model/POJO you want to use for your kafka messages.
You need to addTrustedPackages from JsonDeserializer I have an example in Kotlin, but it's almost the same syntax in java:
val deserializer = JsonDeserializer<Foo>()
deserializer.addTrustedPackages("com.example.entity.Foo") // Adding Foo to our trusted packages
val consumerFactory = DefaultKafkaConsumerFactory(
consumerConfigs(), // your consumer config
StringDeserializer(),
deserializer // Using our newly created deserializer
)
Or in your application.yml
In your application.yml file following spring-kafka instructions. We add the Foo class from com.example.entity.Foo package in the trusted store using:
spring:
kafka:
consumer:
properties:
spring.json.trusted.packages: "com.example.entity.Foo"
With spring.json.trusted.packages accepting an array of packages. You can specify a class package, or use * for any packages. In that case you don't need to pass your deserializer in DefaultKafkaConsumerFactory() only in the consumer config.
Solution 3:[3]
I have also faced this issue, however solutions above did not work for me. What did the trick, though, was configuring kafka consumer factory as follows:
props.put(JsonDeserializer.TRUSTED_PACKAGES, "your.package.name");
Solution 4:[4]
jsonDeserializer.addTrustedPackages("*");
solved my issue for spring-kafka-2.2.8.
To add it in application.properties:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
IMPORTANT NOTE:
They have no effect if you have provided Serializer and Deserializer instances for KafkaConsumer and KafkaProducer, respectively.
References:
[1] https://docs.spring.io/spring-kafka/reference/html/#json-serde
[2] https://github.com/spring-projects/spring-kafka/issues/535
Solution 5:[5]
For those facing this when working with streams, you have to specify trusted packages for both consumer and streams, as Kafka initializes 2 different deserializers
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
...
properties:
spring.json.trusted.packages: "YOUR ENTTIES PACKAGE"
streams:
...
properties:
spring.json.trusted.packages: "YOUR ENTTIES PACKAGE"
Solution 6:[6]
i had a similar problem when i wanted to consume messages in a consumer app I getting 2 errors :
1-The class 'someClass' is not in the trusted packages: [java.util, java.lang,If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*)
2-org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition partion at offset 9902. If needed, please seek past the record to continue consumption.
I could solve that with add this properties into kafka consumer config generator method(makeConfig) at the KafkaConfig class for consuming configs with this approach my problem has been solved:
private Map<String, Object> makeConfig(ServiceMessagePriority input)
{
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
configProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, ErrorHandlingDeserializer.class);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.core.model.ServiceMsgDTO");
configProps.put(JsonDeserializer.USE_TYPE_INFO_HEADERS,false);
configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return configProps;
}
Solution 7:[7]
My version of spring-kafka is 2.2.11 and also I had this error.
I got this error because I had configurated two consumers in the same kafta topic with diferent configuration. One of them had ConsumerFactory<String, OrderDTO> and other had ConsumerFactory<String, String>.
I solved the error changing the configuration of one Consumer because was wrong.
Simply you check the consumers of the topic
Solution 8:[8]
There're two ways to solve the problem:
- Disable type headers in the producer
- Add a list of trusted producers in the consumer
Disable type headers in the producer
Your producer properties files should look like following,
spring:
profiles: dev
kafka:
producer:
bootstrap-servers: localhost:9092, localhost:9093
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
acks: 1
spring:
json:
add:
type:
headers: false
And your consumer properties files should be as follows,
spring:
profiles: dev
kafka:
consumer:
bootstrap-servers: localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
group-id: consumer-group-1
properties:
spring:
json:
value:
default:
type: 'com.kafka.consumer.Message'
Add a list of trusted producers in the consumer
If you don't want to change anything in the producer properties file, then update your consumer properties file as follows,
spring:
profiles: dev
kafka:
consumer:
bootstrap-servers: localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
group-id: consumer-group-1
properties:
spring:
json:
value:
default:
type: 'com.kafka.consumer.Message'
type:
mapping: 'com.kafka.producer.Message:com.kafka.consumer.Message'
trusted:
packages: 'com.kafka.producer'
In my Message class, make sure you have the setters and getters along with NoArgsConstructor, otherwise, it won't work.
@AllArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public
class Message {
Integer id;
String name;
String address;
String phone;
boolean isActive;
}
Consumer Listener:
@Component
@Slf4j
public class KafkaMessageListener {
@KafkaListener(topics = {"test_json"})
public void OnMessage(Message rc){
log.info(rc.getName());
}
}
Note: My Producer and Consumers are two different projects and their package names as these,
Producer: com.kafka.producer.Message
Consumer: com.kafka.consumer.Message
Solution 9:[9]
Use same package name for both consumer and producer , It solved my error
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Iroshan |
| Solution 2 | |
| Solution 3 | double-beep |
| Solution 4 | |
| Solution 5 | alban maillere |
| Solution 6 | |
| Solution 7 | davidleongz |
| Solution 8 | |
| Solution 9 | HelloChamith |
