'Do not terminate the remaining Mono's inside the "firstWithSignal" after first emit

I have an API that returns an error response very quickly when it - for example - can not find the item, but the success response takes around 5s. I want to return user a response with error code we received, but I also can not have waiting period of 5s. So the way I am intending on solving this is something like this:

 Mono.firstWithSignal(
    Mono.delay(Duration.ofSeconds(1)).thenReturn(HttpStatus.ACCEPTED),
    sendRequest()
 );

What I want to achieve is to have the request executed regardless of how much time it takes, but if it takes longer then a second - respond to the client with HttpStatus.ACCEPTED.

The problem that instead Reactor terminates any remaining Mono once the first one has emitted a signal.



Solution 1:[1]

I don't really like this solution and it feels a bit hacky, but it does work. If you cache the results of the Mono - it can no longer be canceled. You can also use share operator, but I think cache is a better fit here.

 @Test
public void test() {
    var result = Mono.firstWithSignal(
            Mono.delay(Duration.ofSeconds(3))
                    .doOnNext((val) -> System.out.println("HEEEREREEEE"))
                    .thenReturn(3)
                    .cache(),
            Mono.delay(Duration.ofSeconds(1)).thenReturn(1)
    ).block();
    System.out.println(result);
    Mono.delay(Duration.ofSeconds(4)).block();
    System.out.println("DONE");
}

Solution 2:[2]

You can subscribe to both Mono and try to a emit value to a Sinks.One


Sinks.One<HttpStatus> sink = Sinks.one();

// subscribe to the request
request().subscribe(sink::tryEmitValue, sink::tryEmitError);

// subscribe to the timeout
Mono.just(HttpStatus.ACCEPTED).delayElement(Duration.ofSeconds(1))
                              .subscribe(sink::tryEmitValue);

// subscribe to the sink
sink.asMono()
    .subscribe(status -> {
      System.out.println("Success with status : " + status);
    }, error -> {
      System.out.println("Error : " + error.getMessage());
    });

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Dmytro Kostyushko
Solution 2