'Producer Kafka throws deserialization exception

I have one topic and Producer/Consumer:

Dependencies (Spring Initializr)

  • Producer (apache kafka)
  • Consumer (apache kafka stream, cloud stream)

Producer:

KafkaProducerConfig

@Configuration
public class KafkaProducerConfig {

    @Bean
    public KafkaTemplate<String, Person> kafkaTemplate(){
      return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, Person> producerFactory(){
      Map<String, Object> configs = new HashMap<>();
      configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
      configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
      configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

      return new DefaultKafkaProducerFactory<>(configs);
    }

}

Controller:

@RestController
public class KafkaProducerApplication {

    private KafkaTemplate<String, Person> kafkaTemplate;

    public KafkaProducerApplication(KafkaTemplate<String, Person> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    @GetMapping("/persons")
    public Mono<List<Person>> findAll(){
        var personList = Mono.just(Arrays.asList(new Person("Name1", 15),
                    new Person("Name2", 10)));

        personList.subscribe(dataList -> kafkaTemplate.send("topic_test_spring", dataList.get(0)));

        return personList;
    }

}

It works correctly when accessing the endpoint and does not throw any exception in the IntelliJ console.

Consumer:

spring:
  cloud:
    stream:
      function:
        definition: personService
      bindings:
        personService-in-0:
          destination: topic_test_spring
      kafka:
        bindings:
          personService-in-0:
            consumer:
              configuration:
                value:
                  deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      binders:
        brokers:
          - localhost:9091
          - localhost:9092
  kafka:
    consumer:
      properties:
        spring:
          json:
            trusted:
              packages: "*"

PersonKafkaConsumer

@Configuration
public class PersonKafkaConsumer {

  @Bean
  public Consumer<KStream<String, Person>> personService(){
    return kstream -> kstream.foreach((key, person) -> {
      System.out.println(person.getName());
    });

  }

}

Here I get the exception when run the project.

org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately. Caused by: java.lang.IllegalArgumentException: The class 'com.example.producer.model.Person' is not in the trusted packages: [java.util, java.lang, com.nttdata.bootcamp.yanki.model, com.nttdata.bootcamp.yanki.model.]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (). org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.

The package indicated in the exception refers to the entity's package but in the producer. The producer's properties file has no configuration.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source