'Debezium source to Postgres sink DB- JDBC Sink connector issue

Getting below error with JDBC sync connector.

 ERROR WorkerSinkTask{id=12sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "new1501"."test"."patient3" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Cannot ALTER TABLE "new1501"."test"."patient3" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value
    at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:153)
    at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:75)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

My source connector config:

{
    "name": "150-connector-2",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "plugin.name": "pgoutput",
        "database.hostname": "**",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "***",
        "database.dbname": "kafka-source-test",
        "database.server.name": "new1501",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true",
        "snapshot.mode": "always"
    }
}

My sink connector config:

{
    "name": "12sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "new1501.test.patient3",
        "connection.url": "jdbc:postgresql://<ip>:5432/db",         
        "connection.user":"postgres",
    "connection.password":"**",        
        "auto-evolve":"true" ,
        "auto-create":"true" ,           
        "insert.mode": "insert",
        "delete.enabled": "true",
        "pk.fields": "home_id",
        "pk.mode": "record_key",
        "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"true",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable":"true" 
 
        }
} 


Solution 1:[1]

As the error says, your sink table cannot add a "source" field, because there's no default schema value for it (nor should there be a default).

If you consume the source topic, you'll notice it likely doesn't match the columns of your database exactly (you'll want to create the table ahead of time). This is because you need to first extract the after state using a transform. https://debezium.io/documentation/reference/stable/transformations/event-flattening.html

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 OneCricketeer