'How to cache last onNext signal but not onError in a Flux?

I am in the situation where I want a Flux to be converted to a hot source and cache its last onNext signal but not error signals.

Suppose I have the following Flux

Flux a = fluxThatSignalsSomeUserInput()
  .switchMap(input -> timeConsumingMono(input)
    .subscribeOn(Schedulers.boundedElastic())
    .timeout(MY_TIMEOUT, Mono.error(() -> new TimeoutException("took too long.")))
  )
  .cache(1)

Somewhere else I want to use the last emitted onNext signal from Flux a like this

  a.next()
  .map(processedUserInput -> doSomething())
  ...
  .subscribe()

So far this is working just fine and timeConsumingMono(input) is executed only once per user input as intended.

But I also want to recover from errors in the original Flux a. So I added somewhere downstream of a the following retry

Flux b = a
  ...
  .doOnError(t -> logError(t))
  .retryWhen(Retry.maxInARow(NUM_RETRIES))
  .doOnError(t -> showFatalError(t))
  .subscribe()

Now, as .cache(1) also replays error signals, the subscription to b immediately runs into the showFatalError(t) path on the first error/timeout in a without retrying the timeConsumingMono(input) the specified amount of times.

I tried to use Mono::cache inside the switchMap instead of Flux::cache like this to not cache error signals

Flux a = fluxThatSignalsSomeUserInput()
  .switchMap(input -> timeConsumingMono(input)
    .subscribeOn(Schedulers.boundedElastic())
    .timeout(MY_TIMEOUT, Mono.error(() -> new TimeoutException("took too long.")))
    .cache(next -> Duration.ofMillis(Long.MAX_VALUE), t -> Duration.ZERO, () -> Duration.ZERO)
  )

but this way timeConsumingMono(input) gets called multiple times when late subscribing to a in the second code block above.

Is there an easy way to cache just onNext but not onError signals in a Flux so that late subscribers will just see the latest successful signal but the original subscriber still gets the errors?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source