'How to find first match in Flux and stop processing in Reactor
Can't figure out how to stop processing Flux on first match.
This what I have right now:
findAll(): Flux<Object>
findStorageId(Relation r): Mono<Long> | Mono.empty()
isPassing(Relation r): boolean
findAll().flatMap(p -> {
return Flux.fromStream(p.getRelations().stream()).flatMap(r -> {
return isPassing(r) ? findStorageId(r) : Mono.empty();
});
})
.handle((Long storageId, SynchronousSink<Long> sink) -> {
if (storageId != null) {
sink.next(storageId);
sink.complete();
}
})
.next()
.switchIfEmpty(Mono.error(new RuntimeException("Can't find storageId.")));
I'm trying to understand how I can interrupt processing of flux when first storageId is found. Right now I see, that first flatMap
continues to work after finding first match.
Solution 1:[1]
For me it worked out using flatMap ? next ? onError
, the handle
is not needed.
flatMap
: the method returns a Mono ofString
orempty
next
: returns the first or empty ifflatMap
always returned emptyonError
: error handling according to your example
this means that your example should work like you posted it and you don't even need to call handle
Example code:
we log before we pass it to
flatMap
, that way we can check if the stream is processed further after the first non empty mappedMono
public static final String TO_BE_FOUND = "B";
public static void main(String[] args) {
Mono<String> storageId = Flux.just("A", "B", "C", "D", "A")
.doOnNext(id -> System.out.printf("processing: %s\n", id))
.flatMap(s -> findStorageId(s))
.next()
.switchIfEmpty(
Mono.error(new RuntimeException("Can't find storageId."))
);
storageId.subscribe(id -> System.out.printf("storageId found: %s\n", id));
}
private static Mono<String> findStorageId(String s) {
return TO_BE_FOUND.equals(s) ? Mono.just(s + UUID.randomUUID()) : Mono.empty();
}
Output when TO_BE_FOUND = "B"
:
The
Flux
will not be processed further after the firsstorageId
was found.
processing: A
processing: B
storageId found: B85bcdbcb-2903-4962-96ab-b3a97b0c091f
Output when TO_BE_FOUND = "X"
:
processing: A
processing: B
processing: C
processing: D
processing: A
12:52:22.555 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Can't find storageId.
Caused by: java.lang.RuntimeException: Can't find storageId.
Solution 2:[2]
The problem is that flatmap is using using concurrency and prefetch is more than 1. In this case if you dont want to call the database many times but one by one you need to use concatmap with prefatch 1.
public static final String TO_BE_FOUND = "B";
@Override
public void run(String... args) throws Exception {
Mono<String> storageId =
Flux.just("A", "B", "C", "D", "A")
.doOnNext(id -> System.out.printf("processing: %s\n", id))
.concatMap(s -> findStorageId(s),1)
.next()
.switchIfEmpty(Mono.error(new RuntimeException("Can't find storageId.")));
storageId.subscribe();
}
private static Mono<String> findStorageId(String s) {
return TO_BE_FOUND.equals(s)
? Mono.just(s + UUID.randomUUID()).delayElement(Duration.ofSeconds(1))
: Mono.delay(Duration.ofSeconds(1)).flatMap(aLong -> Mono.empty());
}
in this case concatmap with prefetch 1 will request elements 1 by one and it will wait for the response.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | roy man |
Solution 2 | Ricard Kollcaku |