'How to create an output stream (changelog) based on a table in KSQL correctly?

Step 1: Create table

I currently have a table in KSQL which created by

CREATE TABLE cdc_window_table
    WITH (KAFKA_TOPIC='cdc_stream',
          VALUE_FORMAT='JSON') AS
SELECT after->application_id AS application_id,
       COUNT(*) AS application_id_count
FROM cdc_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after->application_id
EMIT CHANGES;

At this point, it created a new table. I can view it by

SELECT *
FROM cdc_window_table
EMIT CHANGES;

which returns data like

+---------------+---------------+---------------+---------------------+
|APPLICATION_ID |WINDOWSTART    |WINDOWEND      |APPLICATION_ID_COUNT |
+---------------+---------------+---------------+---------------------+
|a1             |1648767460000  |1648767480000  |1                    |
|a1             |1648767460000  |1648767480000  |2                    |
|a1             |1648767460000  |1648767480000  |3                    |
|a1             |1648767480000  |1648767500000  |1                    |
|a1             |1648767740000  |1648767760000  |1                    |

Step 2: Create output stream (changelog) - FAILED

I am trying to create an output stream (changelog) based on this table like this image:

(Image source: https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/)

enter image description here

After reading this, I tried these 4 methods:

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING,
                                                 application_id_count INT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON');

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='TUMBLING',
         WINDOW_SIZE='20 SECONDS');

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='SESSION');

CREATE STREAM cdc_window_table_changelog_stream (ROWKEY STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='SESSION');

When I view by

SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;

It only shows table header without any changelog data coming:

+------------------+-----------------------+
|APPLICATION_ID    |APPLICATION_ID_COUNT   |
+------------------+-----------------------+

What would be correct way to create an output stream (changelog) based on a table?



Solution 1:[1]

In step 2, instead of using the topic cdc_window_table, I should use something like _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition.

This table's changelog topic is automatically created by KSQL when I created the previous table.

You can find this long changelog topic name by using

show all topics;

(Note the all above. Without it, the changelog topic will not list.)

The working KSQL would be

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
                                                 application_id_count BIGINT)
  WITH (KAFKA_TOPIC='_confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition',
        VALUE_FORMAT='JSON');

(Note the KEY behind application_id STRING above. Without KEY, the application_id will show as null in the stream.)

When I view by

SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;

at this point, I can see

+------------------+-----------------------+
|APPLICATION_ID    |APPLICATION_ID_COUNT   |
+------------------+-----------------------+
|a1                |null                   |
|a1                |null                   |
|a1                |null                   |

I am not sure why application_id_count is null, but for me, application_id is all I care in my use case. If I find the solution, or anyone knows, I will update this answer.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1