'Debezium MS SQL Server produces wrong JSON format not recognized by Flink

I have the following setting (verified using curl connector/connector-name):

"config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "database.user": "admin",
    "database.dbname": "test",
    "database.hostname": "mssql-host",
    "database.password": "xxxxxxx",
    "database.history.kafka.bootstrap.servers": "server:9092",                                                            "database.history.kafka.topic": "dbhistory.test",                                                                                                             "value.converter.schemas.enable": "false",
    "name": "mssql-cdc",
    "database.server.name": "test",
    "database.port": "1433",
    "include.schema.changes": "false"
  }

I was able to pull CDC events into Kafka topic. It is in following format:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "value"
          }
        ],
        "optional": true,
        "name": "test.dbo.tryme2.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "value"
          }
        ],
        "optional": true,
        "name": "test.dbo.tryme2.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "string",
            "optional": true,
            "field": "change_lsn"
          },
          {
            "type": "string",
            "optional": true,
            "field": "commit_lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "event_serial_no"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.sqlserver.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "test.dbo.tryme2.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 777,
      "value": "xxxxxx"
    },
    "source": {
      "version": "1.8.1.Final",
      "connector": "sqlserver",
      "name": "test",
      "ts_ms": 1647169350996,
      "snapshot": "true",
      "db": "test",
      "sequence": null,
      "schema": "dbo",
      "table": "tryme2",
      "change_lsn": null,
      "commit_lsn": "00000043:00000774:000a",
      "event_serial_no": null
    },
    "op": "r",
    "ts_ms": 1647169350997,
    "transaction": null
  }
}

In Flink, when I created a source table using the topic, I get:

Caused by: java.io.IOException: Corrupt Debezium JSON message

I already have "value.converter.schemas.enable": "false", why doesn't this work?



Solution 1:[1]

Just found out that the configuration was hierarchical, meaning your have to supply both value.converter and value.converter.schemas.enable to override Kafka Connect worker configuration at connector level.

I sincerely hope there were some sort of validation so I did not have to wonder for hours.

Also if schema is desired, there is a Flink configuration hidden in the doc:

In order to interpret such messages, you need to add the option 'debezium-json.schema-include' = 'true' into above DDL WITH clause (false by default). Usually, this is not recommended to include schema because this makes the messages very verbose and reduces parsing performance.

I have to say this is really bad developer experience.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 dz902