'Close rx stream basing on amount of passed data by another stream
I have an infinite stream:
someIntStream: Observable<Int>
and I'd like to expose a function with following signature:
fun retrieveInts(numberOfIntsToRetrieve: Int): Observable<Int>
, which should emit exactly the same items as someIntStream, but it should emit OnCompleted once it emits numberOfIntsToRetrieve ints. My solution:
fun retrieveInts(numberOfIntsToRetrieve: Int): Observable<Int> {
val compositeDisposable = CompositeDisposable()
val ints = mutableListOf<Int>()
return Observable.create() { emitter ->
compositeDisposable.add(
someIntStream().subscribe { someIntItem ->
ints.add(someIntItem)
emitter.onNext(someIntItem)
if (ints.size == numberOfIntsToRetrieve) {
emitter.onComplete()
compositeDisposable.dispose()
}
}
)
}
}
But it's nasty: I have to internally subscribe to stream and dispose it. Any other ideas?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
