'Kafka sink connector not committing offsets but consuming messages

I am using kafka-connector to sink messages to snowflake. Docker image: cp-kafka-connect-base:6.2.0

I have two consumer pods running in distributed mode. Please find the connect-config below

  connector.class: "com.snowflake.kafka.connector.SnowflakeSinkConnector"
  tasks.max: "2"
  topics: "test-topic"
  snowflake.topic2table.map: "test-topic:table1"
  buffer.count.records: "500000"
  buffer.flush.time: "240"
  buffer.size.bytes: "100000000"
  snowflake.url.name: "<url>"
  snowflake.warehouse.name: "name"
  snowflake.user.name: "username"
  snowflake.private.key: "key"
  snowflake.private.key.passphrase: "pass"
  snowflake.database.name: "db-name"
  snowflake.schema.name: "schema-name"
  key.converter: "com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
  value.converter: "com.snowflake.kafka.connector.records.SnowflakeJsonConverter"

envs:

  CONNECT_GROUP_ID: "testgroup" 
  CONNECT_CONFIG_STORAGE_TOPIC: "snowflakesync-config"
  CONNECT_STATUS_STORAGE_TOPIC: "snowflakesync-status"
  CONNECT_OFFSET_STORAGE_TOPIC: "snowflakesync-offset"
  CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
  CONNECT_REST_PORT: "8083"
  CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "3"
  CONNECT_OFFSET_FLUSH_INTERVAL_MS: "5000"
  CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "3"
  CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "3"
  CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
  CONNECTOR_NAME: "test-conn"

I am running two pods with the above config. Two pods are properly attached to one partition each and starts consuming.

Question :: Whenever I deploy / restart the pods,the offets are getting committed [ CURRENT-OFFSET is getting updated] only ONCE , post that the sink connector keeps consuming the messages from topic, but the current-offset is NOT at all updated. ( offsets are not getting committed )

kafka-consumer-groups --bootstrap-server <server>  --describe --group connect-test-conn

This is the command used to check the Current-offset is getting updated or not. Since only once the current_offset is updated, it always shows a lag and the lag keeps increasing.

But , I could see in logs ( put records ) & from snowflake the events are getting persisted.

Would like to know why the offsets are not getting committed continuously.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source