'SpringBoot Embedded Kafka to produce Event using Avro Schema
I have created the below test class to produce an event using AvroSerializer.
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
@TestPropertySource(locations = ("classpath:application-test.properties"))
@ContextConfiguration(classes = { TestAppConfig.class })
@DirtiesContext
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class EntitlementEventsConsumerServiceImplTest {
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@Bean
MockSchemaRegistryClient mockSchemaRegistryClient() {
return new MockSchemaRegistryClient();
}
@Bean
KafkaAvroSerializer kafkaAvroSerializer() {
return new KafkaAvroSerializer(mockSchemaRegistryClient());
}
@Bean
public DefaultKafkaProducerFactory producerFactory() {
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafkaBroker);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
return new DefaultKafkaProducerFactory(props, new StringSerializer(), kafkaAvroSerializer());
}
@Bean
public KafkaTemplate<String, ApplicationEvent> kafkaTemplate() {
KafkaTemplate<String, ApplicationEvent> kafkaTemplate = new KafkaTemplate(producerFactory());
return kafkaTemplate;
}
}
But when I send an event using kafkaTemplate().send(appEventsTopic, applicationEvent);I am getting the below exception.
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema Not Found; error code: 404001
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getIdFromRegistry(MockSchemaRegistryClient.java:79)
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getId(MockSchemaRegistryClient.java:273)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:781)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:562)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363)
When I use MockSchemaRegistryClient why it is trying to lookup the schema?
Solution 1:[1]
schema.registry.url= mock://localhost.something
Basically anything with mock as prefix will do the job. Refer to this https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerDeConfig.java
Also set auto.register.schemas=true
Solution 2:[2]
You are setting the producer not to try and auto register new schema on producing the message , so it just trying to fetch from the SR and did not find its schema on the SR.
also did not see you setup schema registry URL guess its taking default values
To your question the mock is imitating the work of real schema registry, but has its clear disadvantages
/**
- Mock implementation of SchemaRegistryClient that can be used for tests. This version is NOT
- thread safe. Schema data is stored in memory and is not persistent or shared across instances. */
You may look on the document for more information
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | shashi ranjan |
| Solution 2 |
