'Kafka/questDB JDBC sink connector: crashing if multiple topics

I have a Kafka broker with 2 topics and a JDBC sink connector to a questDB database. Everything is built with docker containers.

If i configure the JDBC connector with 1 topic (which ever) then it works just fine and all events are being transferred over to questDB.

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @jdbc_sink.json http://connect:8083/connectors
{
  "name": "jdbc_sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "topics": "topic_1",
    "table.name.format": "${topic}",
    "connection.url": "jdbc:postgresql://questdb:8812/qdb?useSSL=false",
    "connection.user": "admin",
    "connection.password": "quest",
    "auto.create": "true",
    "insert.mode": "insert",
    "dialect.name": "PostgreSqlDatabaseDialect"
  }
}

As soon as I include 2 topics into the JDBC config the questDB and connect containers are crashing.

"topics": "topic_1,topic_2",

Connect Dockerfile

FROM confluentinc/cp-kafka-connect-base:7.1.0

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.3.3
connect:
    build:
      context: ./connect
      dockerfile: Dockerfile
    hostname: connect
    container_name: connect
    depends_on:
      - broker1
      - zookeeper
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker1:29092"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
    networks:
      - broker-kafka
questdb:
    image: questdb/questdb:6.2.2b
    hostname: questdb
    container_name: questdb
    ports:
      - "9000:9000" # REST API and Web Console
      - "9009:9009" # InfluxDB line protocol
      - "8812:8812" # Postgres wire protocol
      - "9003:9003" # Min health server
    volumes:
      - ./questdb:/root/.questdb
    networks:
      - broker-kafka

What is the problem here? Does anyone have an idea?

Below you can see extracts from the docker logs of the connect and questDB containers. I cannot post the complete logs (too many characters) so I tried to extract relevant messages, but frankly speaking I couldn't figure out the determining error message. Please let me know if you need more logs.

questDB log (extract)

# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x00007f7dc99486a8, pid=1, tid=38
#
# JRE version: OpenJDK Runtime Environment (17.0.1+12) (build 17.0.1+12-39)
# Java VM: OpenJDK 64-Bit Server VM (17.0.1+12-39, mixed mode, tiered, compressed oops, compressed class ptrs, g1 gc, linux-amd64)
# Problematic frame:
# J 2153 c1 io.questdb.cairo.vm.api.MemoryCR.getStr(JLio/questdb/cairo/vm/api/MemoryCR$CharSequenceView;)Ljava/lang/CharSequence; [email protected] (101 bytes) @ 0x00007f7dc99486a8 [0x00007f7dc99484a0+0x0000000000000208]
#
# No core dump will be written. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /root/.questdb/hs_err_pid1.log
Compiled method (c1)  386972 2153       3       io.questdb.cairo.vm.api.MemoryCR::getStr (101 bytes)
 total in heap  [0x00007f7dc9948210,0x00007f7dc9949438] = 4648
 relocation     [0x00007f7dc9948370,0x00007f7dc9948498] = 296
 main code      [0x00007f7dc99484a0,0x00007f7dc9948f80] = 2784
 stub code      [0x00007f7dc9948f80,0x00007f7dc9949060] = 224
 oops           [0x00007f7dc9949060,0x00007f7dc9949070] = 16
 metadata       [0x00007f7dc9949070,0x00007f7dc99490b8] = 72
 scopes data    [0x00007f7dc99490b8,0x00007f7dc9949220] = 360
 scopes pcs     [0x00007f7dc9949220,0x00007f7dc99493e0] = 448
 dependencies   [0x00007f7dc99493e0,0x00007f7dc99493e8] = 8
 nul chk table  [0x00007f7dc99493e8,0x00007f7dc9949438] = 80
#
# If you would like to submit a bug report, please visit:
#   https://bugreport.java.com/bugreport/crash.jsp
#

[error occurred during error reporting (), id 0xb, SIGSEGV (0xb) at pc=0x00007f7ddf75b529]


[error occurred during error reporting (), id 0xb, SIGSEGV (0xb) at pc=0x00007f7ddf75b529]

connect log (extract)

[2022-04-06 07:01:50,706] WARN Unable to query database for maximum table name length; the connector may fail to write to tables with long names (io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect)
org.postgresql.util.PSQLException: ERROR: unknown function name: repeat(STRING,INT)
  Position: 15
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:490)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:408)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:329)
    at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:315)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:291)
    at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:243)
    at io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.computeMaxIdentifierLength(PostgreSqlDatabaseDialect.java:119)
    at io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getConnection(PostgreSqlDatabaseDialect.java:106)
    at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80)
    at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:52)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
[2022-04-06 07:01:50,709] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter)
[2022-04-06 07:01:50,745] INFO Checking PostgreSql dialect for existence of TABLE "topic_1" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:50,998] INFO Using PostgreSql dialect TABLE "topic_1" absent (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:51,005] INFO Creating table with sql: CREATE TABLE "topic_1" (
"id" REAL NOT NULL,
"price" REAL NOT NULL,
"size" REAL NOT NULL,
"side" TEXT NOT NULL,
"liquidation" BOOLEAN NOT NULL,
"time" TEXT NOT NULL) (io.confluent.connect.jdbc.sink.DbStructure)
[2022-04-06 07:01:51,184] INFO Checking PostgreSql dialect for existence of TABLE "topic_1" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:51,193] INFO Using PostgreSql dialect TABLE "topic_1" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,136] INFO Checking PostgreSql dialect for type of TABLE "topic_1" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,145] INFO Setting metadata for table "topic_1" to Table{name='"topic_1"', type=TABLE columns=[Column{'liquidation', isPrimaryKey=false, allowsNull=true, sqlType=bool}, Column{'size', isPrimaryKey=false, allowsNull=true, sqlType=float4}, Column{'time', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'price', isPrimaryKey=false, allowsNull=true, sqlType=float4}, Column{'id', isPrimaryKey=false, allowsNull=true, sqlType=float4}, Column{'side', isPrimaryKey=false, allowsNull=true, sqlType=varchar}]} (io.confluent.connect.jdbc.util.TableDefinitions)
[2022-04-06 07:01:52,154] INFO Checking PostgreSql dialect for existence of TABLE "topic_2" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,339] INFO Using PostgreSql dialect TABLE "topic_2" absent (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,339] INFO Creating table with sql: CREATE TABLE "topic_2" (
"id" REAL NOT NULL,
"price" REAL NOT NULL,
"size" REAL NOT NULL,
"side" TEXT NOT NULL,
"liquidation" BOOLEAN NOT NULL,
"time" TEXT NOT NULL) (io.confluent.connect.jdbc.sink.DbStructure)
[2022-04-06 07:01:52,661] INFO Checking PostgreSql dialect for existence of TABLE "topic_2" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,669] INFO Using PostgreSql dialect TABLE "topic_2" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source