'No data in the changelog topic for a KSQL table
Step 1. Postgres CDC Data
I have a Postgres table which has been set up to use Debezium PostgreSQL Connector to generate CDC.
In Kafka, I can see the CDC data in the topic my_db_server.public.product. Here is one example
{
"before": null,
"after": {
"id": "322f13b2-9a0e-407e-94c1-633c7b2a6ca1",
"application_id": "7c8187f9-ed2a-4b27-9636-8ac8c1d20612",
"metadata": "{\"operation\": \"CREATE\"}"
},
"source": {
"version": "1.8.0.Final",
"connector": "postgresql",
"name": "my_db_server",
"ts_ms": 1648074184197,
"snapshot": "false",
"db": "my_db",
"sequence": "[\"25825800\",\"25833896\"]",
"schema": "public",
"table": "product",
"txId": 673,
"lsn": 25833896,
"xmin": null
},
"op": "c",
"ts_ms": 1648074184256,
"transaction": null
}
Step 2. Create a stream based on CDC Data by KSQL
I created a stream based on the CDC data topic by KSQL:
CREATE STREAM cdc_stream (after STRUCT<id STRING,
application_id STRING>)
WITH (KAFKA_TOPIC='my_db_server.public.product',
VALUE_FORMAT='JSON');
Step 3. Create a table based on previous stream by KSQL
Then I created a table by KSQL:
CREATE TABLE cdc_window_table
WITH (KAFKA_TOPIC='cdc_stream',
VALUE_FORMAT='JSON') AS
SELECT after->application_id AS application_id,
COUNT(*) AS application_id_count
FROM cdc_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after->application_id
EMIT CHANGES;
I can view the this table by
SELECT *
FROM cdc_window_table
EMIT CHANGES;
+-------------------------------------+--------------------+-------------------+----------------------+
|APPLICATION_ID |WINDOWSTART |WINDOWEND |APPLICATION_ID_COUNT |
+-------------------------------------+--------------------+-------------------+----------------------+
|7c8187f9-ed2a-4b27-9636-8ac8c1d20612 |1649104960000 |1649104980000 |1 |
|7c8187f9-ed2a-4b27-9636-8ac8c1d20612 |1649104960000 |1649104980000 |2 |
|7c8187f9-ed2a-4b27-9636-8ac8c1d20612 |1649109660000 |1649109680000 |1 |
Step 4. Create changelog stream of the table (Failed, got no data in the changelog topic)
I hope to get changelog of the table created in Step 3.
Below animation explains what is the changelog of the table. (Image source: https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/)
After creating the table in Step 3, I saw two changelog topics were created for this table by using show all topics;. I am not sure why two topics got generated actually.
_confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-Aggregate-Materialize-changelog
_confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition
In Offset Explorer for Kafka, I can see the first topic _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-Aggregate-Materialize-changelog which has data such as
Key:
7c8187f9-ed2a-4b27-9636-8ac8c1d20612
Value:
{
"AFTER": {
"APPLICATION_ID": "7c8187f9-ed2a-4b27-9636-8ac8c1d20612",
"METADATA": "{\"operation\": \"CREATE\"}"
},
"ROWTIME": 1649104965562,
"KSQL_AGG_VARIABLE_0": 1
}
However, above changelog topic is not what I want.
If I understand correctly, the second topic _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition has the actual changelog of the table we created in step 3 by KSQL. Because I want to see the application_id_count change for each application_id after using GROUP BY which can be reflected by this topic name. (But why is this topic name not ending with -changelog?)
However, in Offset Explorer, there is no data for this topic.
On the other side, if I create a stream by
CREATE STREAM cdc_window_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='_confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition',
VALUE_FORMAT='JSON');
and view by
SELECT *
FROM cdc_window_changelog_stream
EMIT CHANGES;
+--------------------------------------+----------------------+
|APPLICATION_ID |APPLICATION_ID_COUNT |
+--------------------------------------+----------------------+
|7c8187f9-ed2a-4b27-9636-8ac8c1d20612 |null |
|7c8187f9-ed2a-4b27-9636-8ac8c1d20612 |null |
|7c8187f9-ed2a-4b27-9636-8ac8c1d20612 |null |
This topic _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition does show new data when upstream data comes. (Note for the null in APPLICATION_ID_COUNT column, once I know what data are inside this topic, then I can select correct one and fix the issue.)
Any idea why no data for this topic _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition? Thanks!
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|

