'Restarting inifinite Flux on error with pubSubReactiveFactory

I'm developing an application which uses reactor libraries to connect with Google pubsub. So I have a Flux of messages. I want it to always consume from the queue, no matter what happens: this means handling all errors in order not to terminate the flux. I was thinking about the (very unlikely) event the connection to pubsub may be lost or whatever may cause the just created Flux to signal an error. I came up with this solution:


    private final PubSubReactiveFactory pubSubReactiveFactory;
    private final String requestSubscription;
    private final Long requestPollTime;
    private final Flux<AcknowledgeablePubsubMessage> requestFlux;

    @Autowired
    public FluxContainer(/* Field args...*/) {
        // init stuff...
        this.requestFlux = initRequestFlux();
    }

    private Flux<AcknowledgeablePubsubMessage> initRequestFlux() {
        return pubSubReactiveFactory.poll(requestSubscription, requestPollTime);
                .doOnError(e -> log.error("FATAL ERROR: could not retrieve message from queue. Resetting flux", e))
                .onErrorResume(e -> initRequestFlux());
    }

    @EventListener(ApplicationReadyEvent.class)
    public void configureFluxAndSubscribe() {
        log.info("Setting up requestFlux...");
        this.requestFlux
                .doOnNext(AcknowledgeablePubsubMessage::ack)
                // ...many more concatenated calls handling flux
    }

Does it makes sense? I'm concerned about memory allocation (I'm relying on the gc to clean stuff). Any comment is welcome.



Solution 1:[1]

What I think you're looking for is basically a Flux that restarts itself when it is terminated for any situation except for the subscription being disposed. In my case I have a source that would generate infinite events from Docker daemon which can disconnect "successfully"

Let sourceFlux be the flux providing your data and would be something you'd want to restart on error or complete, but stop on subscription disposal.

  1. create a recovery function
    Function<Throwable, Publisher<Integer>> recoverFromThrow =
            throwable -> sourceFlux
    
  2. create a new flux that would recover from throw
    var recoveringFromThrowFlux =
      sourceFlux.onErrorResume(recoverFromThrow);
    
  3. create a Flux generator that generates the flux that would recover from a throw. (Note the generic coercion is needed)
    var foreverFlux =
      Flux.<Flux<Integer>>generate((sink) -> sink.next(recoveringFromThrowFlux))
        .flatMap(flux -> flux);
    

foreverFlux is the flux that does self recovery.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Archimedes Trajano