'Close rx stream basing on amount of passed data by another stream

I have an infinite stream:

someIntStream: Observable<Int>

and I'd like to expose a function with following signature:

fun retrieveInts(numberOfIntsToRetrieve: Int): Observable<Int>

, which should emit exactly the same items as someIntStream, but it should emit OnCompleted once it emits numberOfIntsToRetrieve ints. My solution:

fun retrieveInts(numberOfIntsToRetrieve: Int): Observable<Int> {
    val compositeDisposable = CompositeDisposable()
    val ints = mutableListOf<Int>()
    return Observable.create() { emitter ->
        compositeDisposable.add(
            someIntStream().subscribe { someIntItem ->
                ints.add(someIntItem)
                emitter.onNext(someIntItem)
                if (ints.size == numberOfIntsToRetrieve) {
                    emitter.onComplete()
                    compositeDisposable.dispose()
                }
            }
        )
    }
}

But it's nasty: I have to internally subscribe to stream and dispose it. Any other ideas?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source