'High memory usage with left join on Quarkus native Kafka Streams application => nearly OOM alerts on environment

We have a Quarkus native kafka streams application that since a recent bunch of changes and redeployment around mid-January (2022) starting showing signs of unconstrained memory usage, which was leading to "less than 2% available memory" alerts. The only workaround has been to restart the applications (running as services). But slowly but surely over the following hours the memory usage will ramp up again until the alerts were triggered maybe 12-18 hours later. At the time of the restart the apps, which are very largely stateless, will have grown to maybe 2GB of memory usage or more. For a largely stateless application running in quarkus native, which is supposed to be lean and mean, this is not a good look.

Looking at the code changes that had been made, we have been struggling to see what could possibly have caused the issue, unless it were a bug, possibly a rocksdb bug.

We have tried limiting memory using eg the -Xmx flag but it does not work. Tried a bunch of other flags but they do not work.

And we have the same code running on a different (preproduction) environment with similar or higher volumes (due to stress testing on the source systems) and we are not seeing the same issue.

I've spent quite some time looking at people who have possibly the same issue with also possibly the same cause (unknowable of course) - eg Limit Memory Usage of Kafka Streams, Kafka Streams limiting off-heap memory, https://kafkacommunity.blogspot.com/2019/07/re-kafka-streams-unbounded-memory.html. This latter links points to the suspected culprit - rocksdb - see https://github.com/facebook/rocksdb/issues/3216 and other threads about rocksdb bugs causing high memory usage.

Yesterday afternoon I commented out the following code (with variables renamed) that I suspected could be involved in causing the issue, and rebuilt and redeployed the application to production, and I have just checked the memory usage of the applications on prod (it's now about 18 hours since the redeployment and restart) and they are using the same memory as they were using when I redeployed them last night i.e. about 400MB. So clearly the following code is what causes the issue:

    // change key to enable us to join
    .selectKey((k, v) -> String.valueOf(v.getFoo_id()))
    .leftJoin(fooKStream, fooBarJoiner(), JoinWindows.of(Duration.ofSeconds(secondsDelay)).grace(Duration.ofSeconds(secondsDelay)), StreamJoined.with(Serdes.String(), barSerde, Serdes.fooValueSerde))
    // change the key back
    .selectKey((k, v) -> String.valueOf(v.get_id()))

The join window and grace period (secondsDelay) were initially by the way an arbitrary 15 seconds; prior to removing the code yesterday we had reduced that to 5 seconds with no benefit; then to zero seconds with no benefit.

Does anyone have any explanations for this? I cannot see how this should result in such huge memory usage. We are not processing trillions of messages per day here. Maybe a few thousand in the overnight period.

We would like to see this solved of course, both from our own perspective of restoring lost functionality, but, if it's a bug, seeing the bug fix whatever and wherever it is. But I am not an expert in diagnosing such issues and looking at the experiences of others it seems that they have tried to configure rocksdb to use less memory without success.

In the short term it looks like we need to find some kind of workaround, but I am not sure how we can find another way to do it given the RHS sometimes arrives later than the LHS i.e. is a late event. Is there another way to do it?

Edit: I have just seen Kafka kstream-kstream joins with sliding window memory usage grows over the time till OOM with its solution (which seems to have worked for the OP) suggested by Matthias Sax but I cannot see in the API what Matthias speaks about - as far as I can see there is no possibility to set a retention period on the left join. Googling for it turns up this How to specify retention period of join window? which seems to have the same issue.

Edit: I couldn't find where to add Materialized on a left join (I think this is not possible) so I used the deprecated

.until(millis)

method on the JoinWindows argument to leftJoin, setting it to window+grace (effectively 30 seconds). I also set

windowstore.changelog.additional.retention.ms=300000

(5 minutes) but it looks like it DOES NOT WORK! The memory usage is steadily rising by the second; it's now 50% more than it was when the application was restarted. This has to be a bug surely? What am I missing?

Edit: Indeed I had to revert the change and remove the code. For now we are looking for a workaround.

Thanks.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source