'emitting flow values asynchronously with kotlins flow
Iam building a simple Spring Service with kotlin and webflux.
I have a endpoint which returns a flow. The flow contains elements which take a long time to compute which is simulated by a delay.
It is constructed like this:
suspend fun latest(): Flow<Message> {
println("generating messages")
return flow {
for (i in 0..20) {
println("generating $i")
if (i % 2 == 0) delay(1000)
else delay(200)
println("generated messsage $i")
emit(generateMessage(i))
}
println("messages generated")
}
}
My expectation was that it would return Message1 followed by Message3, Message5... and then Message0 because of the different delays the individual generation takes.
But in reality the flow contains the elements in order.
I guess iam missing something important about coroutins and flow and i tryed diffrent thinks to achive what i want with couroutins but i cant figure out how.
Solution
As pointed out by Marko Topolnik and William Reed using channelFlow works.
fun latest(): Flow<Message> {
println("generating numbers")
return channelFlow {
for (i in 0..20) {
launch {
send(generateMessage(i))
}
}
}
}
suspend fun generateMessage(i: Int): Message {
println("generating $i")
val time = measureTimeMillis {
if (i % 2 == 0) delay(1000)
else delay(500)
}
println("generated messsage $i in ${time}ms")
return Message(UUID.randomUUID(), "This is Message $i")
}
When run the results are as expected
generating numbers
generating 2
generating 0
generating 1
generating 6
...
generated messsage 5 in 501ms
generated messsage 9 in 501ms
generated messsage 13 in 501ms
generated messsage 15 in 505ms
generated messsage 4 in 1004ms
...
Solution 1:[1]
Once you go concurrent with the computation of each element, your first problem will be to figure out when all the computation is done.
You have to know in advance how many items to expect. So it seems natural to me to construct a plain List<Deferred<Message>> and then await on all the deferreds before returning the entire thing. You aren't getting any mileage from the flow in your case, since flow is all about doing things synchronously, inside the flow collection.
You can also use channelFlow in combination with a known count of messages to expect, and then terminate the flow based on that. The advantage would be that Spring can start collecting the flow earlier.
EDIT
Actually, the problem of the count isn't present: the flow will automatically wait for all the child coroutines you launched to complete.
Solution 2:[2]
Your current approach uses a single coroutine for the entire function, including the for loop. That means that any calling of a suspend fun, e.g. delay will block that entire coroutine until it completes. It does free up the thread to go do other stuff, but the current coroutine is blocked.
It's hard to say what the right solution is based on your simplified example. If you truly did want a new coroutine for each for loop, you could launch it there, but it doesn't seem clear that is the right solution from the information given.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | William Reed |
