'Kafka c# consumer json response deserialization issue

I've setup a Postgres connector by this guide:

https://medium.com/event-driven-utopia/configuring-debezium-to-capture-postgresql-changes-with-docker-compose-224742ca5372

This works fine, and I can see the proper json in kafkacat.

I've set up a consumer in c#:

using Newtonsoft.Json.Linq;

static async Task<ClientConfig> LoadConfig(string configPath)
{
    try
    {
        var cloudConfig =
           new string[] { "bootstrap.servers=localhost:9092", "session.timeout.ms=45000" }
            .Where(line => !line.StartsWith("#"))
            .ToDictionary(
                line => line.Substring(0, line.IndexOf('=')),
                line => line.Substring(line.IndexOf('=') + 1));

        var clientConfig = new ClientConfig(cloudConfig);

        return clientConfig;
    }
    catch (Exception e)
    {
        Console.WriteLine($"An error occured reading the config file from '{configPath}': {e.Message}");
        System.Environment.Exit(1);
        return null; // avoid not-all-paths-return-value compiler error.
    }
}


static void Consume(string topic, ClientConfig config)
{
    var consumerConfig = new ConsumerConfig(config);
    consumerConfig.GroupId = "dotnet-example-group-1";
    consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
    consumerConfig.EnableAutoCommit = false;


    CancellationTokenSource cts = new CancellationTokenSource();
    Console.CancelKeyPress += (_, e) =>
    {
        e.Cancel = true; // prevent the process from terminating.
        cts.Cancel();
    };



    using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
    {
        consumer.Subscribe(topic);
        var totalCount = 0;
        try
        {
            while (true)
            {
                var cr = consumer.Consume(cts.Token);
                var json = cr.Message.Value;
                Console.WriteLine(cr.Message.Value);
                //totalCount += JObject.Parse(cr.Message.Value).Value<int>("count");
                //Console.WriteLine($"Consumed record with key {cr.Message.Key} and value {cr.Message.Value}, and updated total count to {totalCount}");
            }
        }
        catch (OperationCanceledException)
        {
            // Ctrl-C was pressed.
        }
        finally
        {
            consumer.Close();
        }
    }
}

var topic = "postgres.public.shipments";
var configPath = "librdkafka.config";
var certDir = args.Length == 4 ? args[3] : null;

var config = await LoadConfig(configPath);

Consume(topic, config);

This also works, but I always get a strange format in cr.Message.Value which should be a JSON: "\0\0\0\0\u0002\0\u0002��\u0003��\u0001\u0002\u00142021-01-21\u0002\u0012COMPLETED\u00161.4.2.Final\u0014postgresql\u0010postgres��ا�\0\btrue\u0016shipment_db\fpublic\u0012shipments\u0002�\a\u0002���\u0016\0\u0002r\u0002��ا�\0"

Where is the configuration error?



Solution 1:[1]

The linked blog uses Avro, not JSON. kcat will deserialize the Avro data to show JSON, as written in the post -s value=avro.

If the data is Avro, the error starts in your config variable/file and the ConsumerBuilder value shouldn't be a string, but rather a generated Avro type

If you want JSON, the misconfiguration starts in the converter settings for Debezium; in Docker Compose, you'd want these

KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter 
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter

Related - https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1