'Count events received on a Kafka topic during a fixed period of time
I have a 'users' Kafka topic, which receives messages with AVRO and contains a USERID. I wanted to have the number of messages per USERID received only on the last-minute window. So, following the diagram on the image below, I wanted the result to be:
| USERID | MESSAGE_COUNT |
|---|---|
| 1 | 1 |
| 2 | 1 |
I tried to:
- Create a stream from that topic so that I can perform operations on it.
CREATE STREAM users_stream WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');
- Create a table, which emits the information I want per minute on
my_tabletopic.
CREATE TABLE my_table AS SELECT USERID, count(*) as message_count FROM users_stream WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY USERID;
- Create a pull query on the table, so that I can have the very last items emitted.
SELECT * FROM MY_TABLE;
However, the query emits lots of values, with repeated USERID. Can someone help me? Thanks!
Solution 1:[1]
I figured it out in a different way, without using windowing. For testing my solution, I assumed a topic 'users' that receives messages on JSON format like the following:
{"equipment": "fridge", "power": 10, "measured_time": "2022-02-22"}
In order to get the number of messages per equipment sent with measured_time between '2022-01-31’ and '2022-02-28', I did the following:
- Set configs
SET 'ksql.query.pull.table.scan.enabled'='true';
(in order to make a pull query without WHERE clause, i. e., the last step)
SET 'auto.offset.reset'='earliest';
(in order to handle past events)
- Create stream from users
CREATE STREAM USERS_STREAM (equipment varchar, power double, measured_time varchar) WITH (KAFKA_TOPIC='users', VALUE_FORMAT='json');
- Create a converted stream
CREATE STREAM CONVERTED_STREAM AS SELECT equipment, CAST(measured_time AS DATE) AS MY_DATE FROM USERS_STREAM;
- Create a table, in order to make aggregations
CREATE TABLE USERS_TABLE AS SELECT equipment, COUNT(*) FROM CONVERTED_STREAM WHERE MY_DATE > '2022-01-31' AND MY_DATE < '2022-02-28' GROUP BY EQUIPMENT;
- Make a pull query, so that future events are not considered
SELECT * FROM USERS_TABLE;
I hope it helps someone! If there is a better solution, please let me know!
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Laura Corssac |

