'How to mock a ProducerScope from callbackFlow builder in Kotlin Flow?

I'd like to test a function where I use the scope of a callbackFlow builder. Assuming I have a function inside the flow builder like this:

fun items(): Flow<Items> = callbackFlow {
    getItems(this) {
        trySend(it)
    }
    awaitClose()
}

In getItems function, I received data from websockets. The scope of ProducerScope is used to either launch a new coroutine with a delay and do something or to close the scope if an error happens. So it might call scope.launch { } or scope.close().

For example, this could do something as follows:

fun getItems(scope: ProducerScope<Items>, callback: (Items) -> Unit) {
    if (something) {
        scope.launch { ... }
    }
    if (somethingElse) {
        ...
        scope.close(error)
    }
    ...
    callback(items)
}

The callbackFlow's block uses a ProducerScope, extension of CoroutineScope and SendChannel, I tried to mock it using Mockk:

val scope: ProducerScope<Items> = mockk()

Unfortunately, I end up with:

java.lang.ClassCastException: class kotlin.coroutines.CoroutineContext$Element$Subclass6 cannot be cast to class kotlin.coroutines.ContinuationInterceptor

How can I mock a ProducerScope?
How do I unit test getItems above when scope can be either a CoroutineScope and a SendChannel?

Thanks in advance.



Solution 1:[1]

After many tries, I cannot do this easily without expecting strange behaviors. So I refactored my function to use a Channel and a CoroutineScope separately. Thanks to the CoroutineScope plus extension, I can create a new scope from the flow builder. This is now testable!

Therefore, the flow builder became:

fun items(): Flow<Items> = callbackFlow {
    val channel = this.channel
    val scope = this.plus(this.coroutineContext)

    getItems(channel, scope) {
        ...
    }
    ...
}

My function still uses both but gets them separately:

fun getItems(
    channel: SendChannel<Items>, 
    scope: CoroutineScope, 
    callback: (Items) -> Unit
) {
    if (something) {
        scope.launch { ... } // <-- use scope
    }
    if (somethingElse) {
        ...
        channel.close(error) // <-- use channel
    }
    ...
    callback(items)
}

Then, I can now test using a Channel with the same requirements than the one in callbackFlow and the scope from runTest:

@Test
fun `get items and succeed`() = runTest {
    val channel = Channel<Any>(Channel.BUFFERED, BufferOverflow.SUSPEND)
    ...

    service.getItems(channel, this@runTest, callback)
    ...
}

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Blo