'Count events received on a Kafka topic during a fixed period of time

I have a 'users' Kafka topic, which receives messages with AVRO and contains a USERID. I wanted to have the number of messages per USERID received only on the last-minute window. So, following the diagram on the image below, I wanted the result to be:

USERID MESSAGE_COUNT
1 1
2 1

enter image description here

I tried to:

  1. Create a stream from that topic so that I can perform operations on it.

CREATE STREAM users_stream WITH (KAFKA_TOPIC='users', VALUE_FORMAT='AVRO');

  1. Create a table, which emits the information I want per minute on my_table topic.

CREATE TABLE my_table AS SELECT USERID, count(*) as message_count FROM users_stream WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY USERID;

  1. Create a pull query on the table, so that I can have the very last items emitted.

SELECT * FROM MY_TABLE;

However, the query emits lots of values, with repeated USERID. Can someone help me? Thanks!



Solution 1:[1]

I figured it out in a different way, without using windowing. For testing my solution, I assumed a topic 'users' that receives messages on JSON format like the following:

{"equipment": "fridge", "power": 10, "measured_time": "2022-02-22"}

In order to get the number of messages per equipment sent with measured_time between '2022-01-31’ and '2022-02-28', I did the following:

  1. Set configs

SET 'ksql.query.pull.table.scan.enabled'='true';

(in order to make a pull query without WHERE clause, i. e., the last step)

SET 'auto.offset.reset'='earliest';

(in order to handle past events)

  1. Create stream from users

CREATE STREAM USERS_STREAM (equipment varchar, power double, measured_time varchar) WITH (KAFKA_TOPIC='users', VALUE_FORMAT='json');

  1. Create a converted stream

CREATE STREAM CONVERTED_STREAM AS SELECT equipment, CAST(measured_time AS DATE) AS MY_DATE FROM USERS_STREAM;

  1. Create a table, in order to make aggregations

CREATE TABLE USERS_TABLE AS SELECT equipment, COUNT(*) FROM CONVERTED_STREAM WHERE MY_DATE > '2022-01-31' AND MY_DATE < '2022-02-28' GROUP BY EQUIPMENT;

  1. Make a pull query, so that future events are not considered

SELECT * FROM USERS_TABLE;

I hope it helps someone! If there is a better solution, please let me know!

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Laura Corssac