'How to collect items from a Flow until a particular condition is met?

I have a Flow<List<Int?>> and I want to collect this flow but only until I get a null Int. Then the flow should get cancelled. For example,

val flow = flowOf(
    listOf(1, 2),
    listOf(3, null),
    listOf(4)
)
flow.collectUntilNull {
    println(it)
}

The output that I want is:

[1, 2]
[3, null]

I know there is a function Flow.takeWhile but it doesn't emit the value where predicate returns false. In my case I want that one also.

public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T> = flow {
    return@flow collectWhile { value ->
        if (predicate(value)) {
            emit(value)
            true
        } else {
            // I want a "emit(value)" here
            false
        }
    }
}

Since collectWhile is internal I can't use this code. Although I guess I can copy paste that collectWhile implementation in my code. But is there another way to do this?



Solution 1:[1]

So, one way that I found looks something like:

var b = true
flow.takeWhile {i -> b.also { b = myCondition(i) } }
    .collect {
        println(it)
    }

Taking it out as an extension,

fun <T> Flow<T>.takeUntil(predicate: suspend (T) -> Boolean): Flow<T> {
    var b = true
    return takeWhile { i ->
        b.also { b = predicate(i) }
    }
}

// Usage
flow.takeUntil { myCondition(it) }
    .collect {
 
    }

Is there a better way?

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Arpit Shukla