'Debezium MS SQL Server produces wrong JSON format not recognized by Flink
I have the following setting (verified using curl connector/connector-name):
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.user": "admin",
"database.dbname": "test",
"database.hostname": "mssql-host",
"database.password": "xxxxxxx",
"database.history.kafka.bootstrap.servers": "server:9092", "database.history.kafka.topic": "dbhistory.test", "value.converter.schemas.enable": "false",
"name": "mssql-cdc",
"database.server.name": "test",
"database.port": "1433",
"include.schema.changes": "false"
}
I was able to pull CDC events into Kafka topic. It is in following format:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "value"
}
],
"optional": true,
"name": "test.dbo.tryme2.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "value"
}
],
"optional": true,
"name": "test.dbo.tryme2.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "change_lsn"
},
{
"type": "string",
"optional": true,
"field": "commit_lsn"
},
{
"type": "int64",
"optional": true,
"field": "event_serial_no"
}
],
"optional": false,
"name": "io.debezium.connector.sqlserver.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "test.dbo.tryme2.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 777,
"value": "xxxxxx"
},
"source": {
"version": "1.8.1.Final",
"connector": "sqlserver",
"name": "test",
"ts_ms": 1647169350996,
"snapshot": "true",
"db": "test",
"sequence": null,
"schema": "dbo",
"table": "tryme2",
"change_lsn": null,
"commit_lsn": "00000043:00000774:000a",
"event_serial_no": null
},
"op": "r",
"ts_ms": 1647169350997,
"transaction": null
}
}
In Flink, when I created a source table using the topic, I get:
Caused by: java.io.IOException: Corrupt Debezium JSON message
I already have "value.converter.schemas.enable": "false", why doesn't this work?
Solution 1:[1]
Just found out that the configuration was hierarchical, meaning your have to supply both value.converter and value.converter.schemas.enable to override Kafka Connect worker configuration at connector level.
I sincerely hope there were some sort of validation so I did not have to wonder for hours.
Also if schema is desired, there is a Flink configuration hidden in the doc:
In order to interpret such messages, you need to add the option 'debezium-json.schema-include' = 'true' into above DDL WITH clause (false by default). Usually, this is not recommended to include schema because this makes the messages very verbose and reduces parsing performance.
I have to say this is really bad developer experience.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | dz902 |
