'Unable to parse json event with nested avro schema
I am trying to produce a Kafka event with Avro schema but failed to produce event due to avro format. I have also tried generating random data with avro format but could not parse generated random event data too.
My Avro schema is:
{
"type":"record",
"name":"CsrGlobalSchema",
"namespace":"com.ibm.csr",
"fields":[
{
"name":"data",
"type":[
{
"type":"record",
"name":"data",
"namespace":"com.ibm.csr.data",
"fields":[
{
"name":"entId",
"type":[
"string",
"null"
]
},
{
"name":"device",
"type":[
{
"type":"record",
"name":"device",
"namespace":"com.ibm.csr.data.device",
"fields":[
{
"name":"appName",
"type":[
"string",
"null"
]
},
{
"name":"id",
"type":[
"string",
"null"
]
},
{
"name":"name",
"type":[
"string",
"null"
]
}
]
},
"null"
]
}
]
},
"null"
]
},
{
"name":"evtCxt",
"type":[
{
"type":"record",
"name":"evtCxt",
"fields":[
{
"name":"action",
"type":[
"string",
"null"
]
},
{
"name":"evtSubType",
"type":[
"string",
"null"
]
},
{
"name":"evtTs",
"type":[
"long",
"null"
]
},
{
"name":"evtType",
"type":[
"string",
"null"
]
},
{
"name":"srcEvtId",
"type":[
"string",
"null"
]
},
{
"name":"type",
"type":[
"string",
"null"
]
}
]
},
"null"
]
},
{
"name":"glbCxt",
"type":[
{
"type":"record",
"name":"glbCxt",
"fields":[
{
"name":"tntId",
"type":[
"string",
"null"
]
},
{
"name":"tntName",
"type":[
"string",
"null"
]
}
]
},
"null"
]
}
]
}
My Event data in JSON format is:
{
"data":{
"entId":"adfdf"
},
"evtCxt":{
"srcEvtId":"095d895e-595a-40a0-a887-dd158a3b383e",
"evtTs":10202002002,
"evtType": "EntityCreation",
"evtSubType": "devAppCompliance",
"type":"device",
"action":"create"
},
"glbCxt":{
"tntId":"663f4d7b",
"tntName":"11111222"
}
}
I tried generating random data using below code:
InputStream is = classLoader.getResourceAsStream("global_event_schema_expanded.avsc");
Schema schema = new Schema.Parser().parse(is);
Iterator<Object> it = new RandomData(schema, 4).iterator();
System.out.println(it.next());
But still Avro schema is unable to parse the random data as well.
My code to parse event data and generate event is:
String schemaStr = new String(Files.readAllBytes(Paths.get("global_event_schema_expanded.avsc")));
String genericRecordStr = new String(Files.readAllBytes(Paths.get("kafka_event_data_create.json")));
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaStr);
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("data", genericRecordStr);
ProducerRecord<String, Object> record = new ProducerRecord<String, Object>("source-data-topic", "data", genericRecord);
try {
final Future<RecordMetadata> send = producer.send(record);
} catch(Exception e) {
e.printStackTrace();
}
finally {
producer.flush();
producer.close();
}
I have tried JsonDecoder as well.
String schemaStr = new String(Files.readAllBytes(Paths.get("global_event_schema_expanded.avsc")));
String genericRecordStr = new String(Files.readAllBytes(Paths.get("kafka_event_data_create.json")));
Schema.Parser schemaParser = new Schema.Parser();
Schema schema = schemaParser.parse(schemaStr);
DecoderFactory decoderFactory = new DecoderFactory();
Decoder decoder = decoderFactory.jsonDecoder(schema, genericRecordStr);
DatumReader<GenericData.Record> reader =
new GenericDatumReader<>(schema);
GenericRecord genericRecord = reader.read(null, decoder);
ProducerRecord<String, Object> record = new ProducerRecord<String, Object>("source-data-topic", "data", genericRecord);
try {
final Future<RecordMetadata> send = producer.send(record);
} catch(Exception e) {
e.printStackTrace();
}
finally {
producer.flush();
producer.close();
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|