'How to find first match in Flux and stop processing in Reactor

Can't figure out how to stop processing Flux on first match.

This what I have right now:

findAll(): Flux<Object>
findStorageId(Relation r): Mono<Long> | Mono.empty()
isPassing(Relation r): boolean

findAll().flatMap(p -> {
  return Flux.fromStream(p.getRelations().stream()).flatMap(r -> {
    return isPassing(r) ? findStorageId(r) : Mono.empty();
  });
})
.handle((Long storageId, SynchronousSink<Long> sink) -> {
  if (storageId != null) {
    sink.next(storageId);
    sink.complete();
  }
})
.next()
.switchIfEmpty(Mono.error(new RuntimeException("Can't find storageId.")));

I'm trying to understand how I can interrupt processing of flux when first storageId is found. Right now I see, that first flatMap continues to work after finding first match.



Solution 1:[1]

For me it worked out using flatMap ? next ? onError, the handle is not needed.

  • flatMap: the method returns a Mono of String or empty
  • next: returns the first or empty if flatMap always returned empty
  • onError: error handling according to your example

this means that your example should work like you posted it and you don't even need to call handle

Example code:

we log before we pass it to flatMap, that way we can check if the stream is processed further after the first non empty mapped Mono

    public static final String TO_BE_FOUND = "B";

    public static void main(String[] args) {
        Mono<String> storageId = Flux.just("A", "B", "C", "D", "A")
                .doOnNext(id -> System.out.printf("processing: %s\n", id))
                .flatMap(s -> findStorageId(s))
                .next()
                .switchIfEmpty(
                        Mono.error(new RuntimeException("Can't find storageId."))
                );

        storageId.subscribe(id -> System.out.printf("storageId found: %s\n", id));
    }

    private static Mono<String> findStorageId(String s) {
        return TO_BE_FOUND.equals(s) ? Mono.just(s + UUID.randomUUID()) : Mono.empty();
    }

Output when TO_BE_FOUND = "B":

The Flux will not be processed further after the firs storageId was found.

processing: A
processing: B
storageId found: B85bcdbcb-2903-4962-96ab-b3a97b0c091f

Output when TO_BE_FOUND = "X":

processing: A
processing: B
processing: C
processing: D
processing: A
12:52:22.555 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Can't find storageId.
Caused by: java.lang.RuntimeException: Can't find storageId.

Solution 2:[2]

The problem is that flatmap is using using concurrency and prefetch is more than 1. In this case if you dont want to call the database many times but one by one you need to use concatmap with prefatch 1.

  public static final String TO_BE_FOUND = "B";

  @Override
  public void run(String... args) throws Exception {
    Mono<String> storageId =
        Flux.just("A", "B", "C", "D", "A")
            .doOnNext(id -> System.out.printf("processing: %s\n", id))
            .concatMap(s -> findStorageId(s),1)
            .next()
            .switchIfEmpty(Mono.error(new RuntimeException("Can't find storageId.")));
    storageId.subscribe();
  }

  private static Mono<String> findStorageId(String s) {
    return TO_BE_FOUND.equals(s)
        ? Mono.just(s + UUID.randomUUID()).delayElement(Duration.ofSeconds(1))
        : Mono.delay(Duration.ofSeconds(1)).flatMap(aLong -> Mono.empty());
  }

in this case concatmap with prefetch 1 will request elements 1 by one and it will wait for the response.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 roy man
Solution 2 Ricard Kollcaku