'Is it safe to use unsynchronized mutable state with Kotlin Flow?
Is following code safe and why?
val flow: Flow<String> = ...
val allStrings = mutableListOf<String>()
var sum = 0
flow.transform {
allStrings += it
emit(it.toInt())
}.collect {
sum += it
}
Following test demonstrates collect {} body being called from different threads:
val ctx = newFixedThreadPoolContext(32, "my-context")
runBlocking(ctx) {
val f = flow<Int> {
(1 .. 1000).forEach {
emit(it)
}
}
var t: Thread? = null
f.collect {
delay(1)
// this requirement will fail
if (t == null) t = Thread.currentThread() else require(t == Thread.currentThread())
}
}
And another one that tests publication:
fun main(args: Array<String>) {
val ctx = newFixedThreadPoolContext(32, "my-context")
runBlocking(ctx) {
val f = flow<Int> {
(1 .. 1000000).forEach {
emit(it)
}
}
var c = 0
f.transform {
c += 1
boo()
c += 1
emit(it)
c += 1
}.collect {
c += 1
boo()
c += 1
}
println(c) // prints 5_000_000
}
}
suspend fun boo() {
withContext(Dispatchers.IO) {
}
}
Therefore it seems kotlin flow ensures publication between coroutine invocations but is this intentional (or even documented) or implementation side effect?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
