'Kafka c# consumer json response deserialization issue
I've setup a Postgres connector by this guide:
This works fine, and I can see the proper json in kafkacat.
I've set up a consumer in c#:
using Newtonsoft.Json.Linq;
static async Task<ClientConfig> LoadConfig(string configPath)
{
try
{
var cloudConfig =
new string[] { "bootstrap.servers=localhost:9092", "session.timeout.ms=45000" }
.Where(line => !line.StartsWith("#"))
.ToDictionary(
line => line.Substring(0, line.IndexOf('=')),
line => line.Substring(line.IndexOf('=') + 1));
var clientConfig = new ClientConfig(cloudConfig);
return clientConfig;
}
catch (Exception e)
{
Console.WriteLine($"An error occured reading the config file from '{configPath}': {e.Message}");
System.Environment.Exit(1);
return null; // avoid not-all-paths-return-value compiler error.
}
}
static void Consume(string topic, ClientConfig config)
{
var consumerConfig = new ConsumerConfig(config);
consumerConfig.GroupId = "dotnet-example-group-1";
consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
consumerConfig.EnableAutoCommit = false;
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
consumer.Subscribe(topic);
var totalCount = 0;
try
{
while (true)
{
var cr = consumer.Consume(cts.Token);
var json = cr.Message.Value;
Console.WriteLine(cr.Message.Value);
//totalCount += JObject.Parse(cr.Message.Value).Value<int>("count");
//Console.WriteLine($"Consumed record with key {cr.Message.Key} and value {cr.Message.Value}, and updated total count to {totalCount}");
}
}
catch (OperationCanceledException)
{
// Ctrl-C was pressed.
}
finally
{
consumer.Close();
}
}
}
var topic = "postgres.public.shipments";
var configPath = "librdkafka.config";
var certDir = args.Length == 4 ? args[3] : null;
var config = await LoadConfig(configPath);
Consume(topic, config);
This also works, but I always get a strange format in cr.Message.Value which should be a JSON:
"\0\0\0\0\u0002\0\u0002��\u0003��\u0001\u0002\u00142021-01-21\u0002\u0012COMPLETED\u00161.4.2.Final\u0014postgresql\u0010postgres��ا�\0\btrue\u0016shipment_db\fpublic\u0012shipments\u0002�\a\u0002���\u0016\0\u0002r\u0002��ا�\0"
Where is the configuration error?
Solution 1:[1]
The linked blog uses Avro, not JSON. kcat will deserialize the Avro data to show JSON, as written in the post -s value=avro.
If the data is Avro, the error starts in your config variable/file and the ConsumerBuilder value shouldn't be a string, but rather a generated Avro type
If you want JSON, the misconfiguration starts in the converter settings for Debezium; in Docker Compose, you'd want these
KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
Related - https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
