'Cassandra lightweight transactions

I have a Flink job counts "application" events coming from a Kafka broker. I have a keyed window stream and, for each window, I count the number of events (I have 5-minutes windows). Finally I push the aggregated data into Cassandra with Cassandra Sink connector.

application window_start window_end count
app1 2022-03-21 15:00:00 2022-03-21 15:05:00 15
app2 2022-03-21 15:00:00 2022-03-21 15:05:00 23

I also handle late events with side-outputs. So, when a late event arrives I have this situation:

| app1 | 2022-03-21 15:00:00 | 2022-03-21 15:05:00 | 1 |

The expected output must be:

application window_start window_end count
app1 2022-03-21 15:00:00 2022-03-21 15:05:00 16
app2 2022-03-21 15:00:00 2022-03-21 15:05:00 23

But it is:

application window_start window_end count
app1 2022-03-21 15:00:00 2022-03-21 15:05:00 1
app2 2022-03-21 15:00:00 2022-03-21 15:05:00 23

This because Cassandra uses UPSERT. With a SQL database (es. PostgreSQL) I can handle this problem:

"INSERT INTO table(window_start, window_end, application, count) VALUES(?, ?, ?, ?) ON CONFLICT (window_start, window_end, event_name, application) DO UPDATE SET count=EXCLUDED.count + table.count;"

Is it possible to make this kind of query with Cassandra, or this is a limitation?

Remember I'm using Apache Flink and Cassandra as Sink and Java is my programming language.

Thanks!



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source