'How to create an output stream (changelog) based on a table in KSQL correctly?
Step 1: Create table
I currently have a table in KSQL which created by
CREATE TABLE cdc_window_table
WITH (KAFKA_TOPIC='cdc_stream',
VALUE_FORMAT='JSON') AS
SELECT after->application_id AS application_id,
COUNT(*) AS application_id_count
FROM cdc_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after->application_id
EMIT CHANGES;
At this point, it created a new table. I can view it by
SELECT *
FROM cdc_window_table
EMIT CHANGES;
which returns data like
+---------------+---------------+---------------+---------------------+
|APPLICATION_ID |WINDOWSTART |WINDOWEND |APPLICATION_ID_COUNT |
+---------------+---------------+---------------+---------------------+
|a1 |1648767460000 |1648767480000 |1 |
|a1 |1648767460000 |1648767480000 |2 |
|a1 |1648767460000 |1648767480000 |3 |
|a1 |1648767480000 |1648767500000 |1 |
|a1 |1648767740000 |1648767760000 |1 |
Step 2: Create output stream (changelog) - FAILED
I am trying to create an output stream (changelog) based on this table like this image:
(Image source: https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/)
After reading this, I tried these 4 methods:
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING,
application_id_count INT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON');
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='TUMBLING',
WINDOW_SIZE='20 SECONDS');
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='SESSION');
CREATE STREAM cdc_window_table_changelog_stream (ROWKEY STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='SESSION');
When I view by
SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;
It only shows table header without any changelog data coming:
+------------------+-----------------------+
|APPLICATION_ID |APPLICATION_ID_COUNT |
+------------------+-----------------------+
What would be correct way to create an output stream (changelog) based on a table?
Solution 1:[1]
In step 2, instead of using the topic cdc_window_table, I should use something like _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition.
This table's changelog topic is automatically created by KSQL when I created the previous table.
You can find this long changelog topic name by using
show all topics;
(Note the all above. Without it, the changelog topic will not list.)
The working KSQL would be
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='_confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition',
VALUE_FORMAT='JSON');
(Note the KEY behind application_id STRING above. Without KEY, the application_id will show as null in the stream.)
When I view by
SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;
at this point, I can see
+------------------+-----------------------+
|APPLICATION_ID |APPLICATION_ID_COUNT |
+------------------+-----------------------+
|a1 |null |
|a1 |null |
|a1 |null |
I am not sure why application_id_count is null, but for me, application_id is all I care in my use case. If I find the solution, or anyone knows, I will update this answer.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |

