'No data in the changelog topic for a KSQL table

Step 1. Postgres CDC Data

I have a Postgres table which has been set up to use Debezium PostgreSQL Connector to generate CDC.

In Kafka, I can see the CDC data in the topic my_db_server.public.product. Here is one example

{
  "before": null,
  "after": {
    "id": "322f13b2-9a0e-407e-94c1-633c7b2a6ca1",
    "application_id": "7c8187f9-ed2a-4b27-9636-8ac8c1d20612",
    "metadata": "{\"operation\": \"CREATE\"}"
  },
  "source": {
    "version": "1.8.0.Final",
    "connector": "postgresql",
    "name": "my_db_server",
    "ts_ms": 1648074184197,
    "snapshot": "false",
    "db": "my_db",
    "sequence": "[\"25825800\",\"25833896\"]",
    "schema": "public",
    "table": "product",
    "txId": 673,
    "lsn": 25833896,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1648074184256,
  "transaction": null
}

Step 2. Create a stream based on CDC Data by KSQL

I created a stream based on the CDC data topic by KSQL:

CREATE STREAM cdc_stream (after STRUCT<id STRING,
                                       application_id STRING>)
  WITH (KAFKA_TOPIC='my_db_server.public.product',
        VALUE_FORMAT='JSON');

Step 3. Create a table based on previous stream by KSQL

Then I created a table by KSQL:

CREATE TABLE cdc_window_table
  WITH (KAFKA_TOPIC='cdc_stream',
        VALUE_FORMAT='JSON') AS
SELECT after->application_id AS application_id,
       COUNT(*) AS application_id_count
  FROM cdc_stream
  WINDOW TUMBLING (SIZE 20 SECONDS)
  GROUP BY after->application_id
EMIT CHANGES;

I can view the this table by

SELECT *
FROM cdc_window_table
EMIT CHANGES;
+-------------------------------------+--------------------+-------------------+----------------------+
|APPLICATION_ID                       |WINDOWSTART         |WINDOWEND          |APPLICATION_ID_COUNT  |
+-------------------------------------+--------------------+-------------------+----------------------+
|7c8187f9-ed2a-4b27-9636-8ac8c1d20612 |1649104960000       |1649104980000      |1                     |
|7c8187f9-ed2a-4b27-9636-8ac8c1d20612 |1649104960000       |1649104980000      |2                     |
|7c8187f9-ed2a-4b27-9636-8ac8c1d20612 |1649109660000       |1649109680000      |1                     |

Step 4. Create changelog stream of the table (Failed, got no data in the changelog topic)

I hope to get changelog of the table created in Step 3.

Below animation explains what is the changelog of the table. (Image source: https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/)

enter image description here

After creating the table in Step 3, I saw two changelog topics were created for this table by using show all topics;. I am not sure why two topics got generated actually.

_confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-Aggregate-Materialize-changelog
_confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition

In Offset Explorer for Kafka, I can see the first topic _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-Aggregate-Materialize-changelog which has data such as

Key:
7c8187f9-ed2a-4b27-9636-8ac8c1d20612

Value:
{
  "AFTER": {
    "APPLICATION_ID": "7c8187f9-ed2a-4b27-9636-8ac8c1d20612",
    "METADATA": "{\"operation\": \"CREATE\"}"
  },
  "ROWTIME": 1649104965562,
  "KSQL_AGG_VARIABLE_0": 1
}

However, above changelog topic is not what I want.

If I understand correctly, the second topic _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition has the actual changelog of the table we created in step 3 by KSQL. Because I want to see the application_id_count change for each application_id after using GROUP BY which can be reflected by this topic name. (But why is this topic name not ending with -changelog?)

However, in Offset Explorer, there is no data for this topic.

On the other side, if I create a stream by

CREATE STREAM cdc_window_changelog_stream (application_id STRING KEY,
                                           application_id_count BIGINT)
  WITH (KAFKA_TOPIC='_confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition',
        VALUE_FORMAT='JSON');

and view by

SELECT *
  FROM cdc_window_changelog_stream
EMIT CHANGES;
+--------------------------------------+----------------------+
|APPLICATION_ID                        |APPLICATION_ID_COUNT  |
+--------------------------------------+----------------------+
|7c8187f9-ed2a-4b27-9636-8ac8c1d20612  |null                  |
|7c8187f9-ed2a-4b27-9636-8ac8c1d20612  |null                  |
|7c8187f9-ed2a-4b27-9636-8ac8c1d20612  |null                  |

This topic _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition does show new data when upstream data comes. (Note for the null in APPLICATION_ID_COUNT column, once I know what data are inside this topic, then I can select correct one and fix the issue.)

Any idea why no data for this topic _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition? Thanks!



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source