'MongoSinkConnector expect to find a schema subject that doesn't exist

I'm trying to use the connect MongoDB kafka connector sink (mongodb-kafka-connect-mongodb-1.7.0) to write avro event from kafka to MongoDB.

I have a schema registry set up that works with the kafka consumer example or my custom one, they are I both able to deserialize the event and print them.

On the other hand when I run the connector I get the following exception:

Subject '<my-avro-schema-name>-value' not found.; error code: 40401

The higher level stack trace message are :

  • Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema version for id 11

  • Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic client-order-request to Avro:

And indeed this subject or this id doesn't exist in the schema-registry, actually the highest id I have is 10 and I do have a subject named <my-avro-schema-name>-key.

Why is the MongoSinkConnector trying find a subject that doesn't exist ?

Connect properties:

bootstrap.servers=<value>
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/git/1.libraries/kafka_2.12-2.2.0/plugins

MongoSink properties:

name=<my-avro-schema-name>-sink
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
connection.uri=mongodb://<value>
database=Test
collection=test
topics=test
key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://<address>

schema-registry.properties:

listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=<address>
kafkastore.topic=_schemas
debug=false
auto.register.schemas=false
use.latest.version=true

Kafka producer configuraton:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-address>");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro  Producer");
props.put("schema.registry.url", "<schema-registry>");

KafkaProducer<String, AllEventsUnion> producerRequest = new KafkaProducer<>(props);

AllEventsUnion clientOrderRequest = createClientOrderRequest();
            
final ProducerRecord<String, AllEventsUnion> producerOrderRequest = new ProducerRecord<>("all-events-union",
                    "ClientOrderRequest-" + calendar.getTimeInMillis(), clientOrderRequest);

The AllEventsUnion is a union avro schema of multiple types. I'm using it to send different event type to the same kafka topic, which is why I thought I needed to register it before hand. But apparently you don't need to register schema before using them in the schema registry ?



Solution 1:[1]

I do have a subject named <my-avro-schema-name>-key

This would indicate you have Avro keys.

Then why use key.converter=org.apache.kafka.connect.storage.StringConverter rather than AvroConverter?

indeed this subject or this id doesn't exist in the schema-registry

Then your Avro producer, upstream from the connector (if exists), is having a problem and has not registered the schema/subject or actually written Avro data using the schema registry.

Why is the MongoSinkConnector trying find a subject that doesn't exist ?

Because you've set this, and the data in the topic contains the ID to some non-existing subject.

value.converter=io.confluent.connect.avro.AvroConverter

Registering a schema after the data is produced will not modify data that's already in the topic you're consuming with other Avro information

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1