'Best practices for transforming a batch of records using KStream
I am new to KStream and would like to know best practices or guidance on how to optimally process batch of records of n size using KStream. I have a working code as shown below but it does work for single messages at a time.
KStream<String, String> sourceStream = builder.stream("upstream-kafka-topic",
Consumed.with(Serdes.String(),
Serders.String());
//transform sourceStream using implementation of ValueTransformer<String,String>
sourceStream.transformValues(() -> new MyValueTransformer()).
to("downstream-kafka-topic",
Produced.with(Serdes.String(),
Serdes.String());
Above code works with single records as MyValueTransformer which implements ValueTransformer transforms single String value. How do I make above code work for Collection of String values?
Solution 1:[1]
You would need to somehow "buffer / aggregate" the messages. For example, you could add a state store to your transformer and store N messages inside the store. As long as the store contains fewer than N messages you don't do any processing and also don't emit any output (you might want to use flatTransformValues which allows you to emit zero results).
Solution 2:[2]
Not sure what you're trying to achieve. Kafka Streams by concept is designed to process one record at a time. If you want to process a collection or batch of messages you have a few options.
You might not actually need Kafka streams as the example you mentioned doesn't do much with the message, in this case, you can leverage a normal Consumer which will enable you to process in Batches. Check spring Kafka implementation of this here -> https://docs.spring.io/spring-kafka/docs/current/reference/html/#receiving-messages (Kafka process batches on the network layer but normally you would process one record at a time, but it's possible with a standard client to process batches) OR you might model your Object value to have an array of messages so for each record you will be receiving an object which contains a collection embedded which you could then use Kafka streams to do it, check the array type for Avro -> https://avro.apache.org/docs/current/spec.html#Arrays
Check this part of the documentation to understand better the Kafka streams concepts -> https://kafka.apache.org/31/documentation/streams/core-concepts
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Matthias J. Sax |
| Solution 2 | Dharman |
