'SpringBoot Embedded Kafka to produce Event using Avro Schema

I have created the below test class to produce an event using AvroSerializer.

@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
@TestPropertySource(locations = ("classpath:application-test.properties"))
@ContextConfiguration(classes = { TestAppConfig.class })
@DirtiesContext
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class EntitlementEventsConsumerServiceImplTest {


    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;

    @Bean
    MockSchemaRegistryClient mockSchemaRegistryClient() {
        return new MockSchemaRegistryClient();
    }

    @Bean
    KafkaAvroSerializer kafkaAvroSerializer() {
        return new KafkaAvroSerializer(mockSchemaRegistryClient());
    }

    @Bean
    public DefaultKafkaProducerFactory producerFactory() {
        Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafkaBroker);
        props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
        return new DefaultKafkaProducerFactory(props, new StringSerializer(), kafkaAvroSerializer());
    }

    @Bean
    public KafkaTemplate<String, ApplicationEvent> kafkaTemplate() {
        KafkaTemplate<String, ApplicationEvent> kafkaTemplate = new KafkaTemplate(producerFactory());
        return kafkaTemplate;
    }
}

But when I send an event using kafkaTemplate().send(appEventsTopic, applicationEvent);I am getting the below exception.

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema Not Found; error code: 404001
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getIdFromRegistry(MockSchemaRegistryClient.java:79)
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getId(MockSchemaRegistryClient.java:273)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:781)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:562)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363)

When I use MockSchemaRegistryClient why it is trying to lookup the schema?



Solution 1:[1]

   schema.registry.url= mock://localhost.something

Basically anything with mock as prefix will do the job. Refer to this https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerDeConfig.java

Also set auto.register.schemas=true

Solution 2:[2]

You are setting the producer not to try and auto register new schema on producing the message , so it just trying to fetch from the SR and did not find its schema on the SR.


also did not see you setup schema registry URL guess its taking default values


To your question the mock is imitating the work of real schema registry, but has its clear disadvantages

/**

  • Mock implementation of SchemaRegistryClient that can be used for tests. This version is NOT
  • thread safe. Schema data is stored in memory and is not persistent or shared across instances. */

You may look on the document for more information

https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClient.java#L47

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 shashi ranjan
Solution 2