'Reactor Kafka consume messages synchronous and process them async
I'm quite new into the reactive world and using Spring Webflux + reactor Kafka.
kafkaReceiver
.receive()
// .publishOn(Schedulers.boundedElastic())
.doOnNext(a -> log.info("Reading message: {}", a.value()))
.concatMap(kafkaRecord ->
//perform DB operation
//kafkaRecord.receiverOffset.ackwnowledge
)
.doOnError(e -> log.error("Error", e))
.retry()
.subscribe();
I understood that in order to parallelise message consumption, I have to instantiate one KafkaReceiver for each partition but is it possible/recommended for a partition to read messages in a synchronous manner and process them async (including the manual acknowledge)?
So that this is the desired output:
Reading message:1
Reading message:2
Reading message:3
Reading message:4
Stored message 1 in DB + ack
Reading message:5
Stored message 2 in DB + ack
Stored message 5 in DB + ack
Stored message 3 in DB + ack
Stored message 4 in DB + ack
In case of errors, I'm thinking of publishing the record to a DLT.
I've tried with flatMap too, but it seems that the entire processing happens sequentially on a single thread. Also if I'm publishing to a new scheduler, the processing happens on a new single Thread. If what I'm asking is possible, can someone please help me with a code snippet?
Solution 1:[1]
What's the output of your current code log ?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Nicolas Belfis |
