'Kafka aggregation events
I have an entry topic in which I receive data from sensors. Typically the data I receive looks like this :
key : project-id
value: {id: 'A', value: 1}
In order to make some calculation later in the topology, I need to construct a map with all the last values received from each captors.
key : project-id value: { eventByEventId : {A : {id: 'A', value: 1}, B : {id: 'B', value: 1}}, lastEventId: 'B'}
In order to make this, I make a joining between the sensor topic and the aggregation topic, the result of the joining is post in the aggregation topic.
------ sensor (KStream) - |
| ---------- aggreg
-------aggreg (KTable) ---|
UPDATE : Here is the java code to achieve this joinning :
KStream<String, String> sensorsSource = streamsBuilder.stream(sensorTopicName, Consumed.with(Serdes.String(),
Serdes.String()));
KTable<String, String> aggSource = streamsBuilder.table(aggregateTopicName, Consumed.with(Serdes.String(), Serdes.String()));
sensorsSource.leftJoin(aggSource, this::updateSensorAggregatedRecord).to(aggregateTopicName, Produced.with(Serdes.String(), Serdes.String()));
The problem is when we publish very quickly new values on sensor topic, the joinning is made sometimes with an old version of the aggreg KTable. You can visualize the problem with the following schema :
It seems that when the B event arrives, Kafka make the joinning with the corresponding KTable state at the moment of the B event. And at this time, the ktable is not up to date with the modification which should be done because of the A event.
Is there a solution to achieve to construct correctly my map?
Thank you for your help. Regards CG
Solution 1:[1]
the explanation helps a lot but some code would be probably more useful, anyway based on my understanding of the issue what should probably work is using a small windowing of time with a grace period of a few milliseconds on your aggregation KTable which should do the trick, how much you want to make the windowing and grace is up to the business case at hand. Check -> https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#stateful-transformations
So something like:
stram.groupByKey().windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMillis(10), Duration.ofMillis(100))...
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | groo |

