'Debezium source to Postgres sink DB- JDBC Sink connector issue
Getting below error with JDBC sync connector.
ERROR WorkerSinkTask{id=12sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "new1501"."test"."patient3" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Cannot ALTER TABLE "new1501"."test"."patient3" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value
at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:153)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:75)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
My source connector config:
{
"name": "150-connector-2",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"plugin.name": "pgoutput",
"database.hostname": "**",
"database.port": "5432",
"database.user": "postgres",
"database.password": "***",
"database.dbname": "kafka-source-test",
"database.server.name": "new1501",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
"snapshot.mode": "always"
}
}
My sink connector config:
{
"name": "12sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "new1501.test.patient3",
"connection.url": "jdbc:postgresql://<ip>:5432/db",
"connection.user":"postgres",
"connection.password":"**",
"auto-evolve":"true" ,
"auto-create":"true" ,
"insert.mode": "insert",
"delete.enabled": "true",
"pk.fields": "home_id",
"pk.mode": "record_key",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true"
}
}
Solution 1:[1]
As the error says, your sink table cannot add a "source" field, because there's no default schema value for it (nor should there be a default).
If you consume the source topic, you'll notice it likely doesn't match the columns of your database exactly (you'll want to create the table ahead of time). This is because you need to first extract the after
state using a transform. https://debezium.io/documentation/reference/stable/transformations/event-flattening.html
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | OneCricketeer |