'Reactor parallel service calls, take first available with some priority ordering
I'm trying to use Reactor Core to make multiple service calls in parallel and first available result based on priority of results.
That is, I want to return the first result available and not wait for other results. Additionally, I want to the service calls to have a priority of ordering.
So let's say:
- I have 3 services to call: service1, service2 and service3.
- If no service call returns after 1000 ms I want to return an optional empty.
- The first service call to complete within the total timeout (1000 ms) which produces a non-empty result should be returned.
So specifically, I want to return service1 if it produces result under the 1000 ms timeout and fall back to service2, service3, ... serviceN if the others do not produce results or timeout.
I have this mostly working with the caveat that it returns the first available. I'm not sure how to enforce that I want service1 first, service2 second, etc.
Here is my code, we have 3 handlers:
public interface Handler {
Optional<QueryResult> select(QueryContext context);
boolean isEligible(QueryContext context);
}
public class Service1 implements Handler {
public Optional<QueryResult> select(QueryContext context) {
// artificially sleep for 150ms to simulate a delay
Thread.sleep(150ms);
return Optional.of(new QueryResult("1"));
}
}
public class Service2 implements Handler {
public Optional<QueryResult> select(QueryContext context) {
return Optional.of(new QueryResult("2"));
}
}
public class Service3 implements Handler {
public Optional<QueryResult> select(QueryContext context) {
return Optional.of(new QueryResult("3"));
}
}
public class QueryEngine implements Query {
private final List<QueryHandler> handlers;
private final Scheduler scheduler;
@Override
public Optional<QueryResult> select(final QueryContext context) {
try {
return Flux.fromIterable(handlers)
.filter(handler -> handler.isEligible(context))
.flatMap(eligibleHandler -> Mono.just(eligibleHandler)
.publishOn(scheduler)
.timeout(Duration.ofMillis(1000))
.map(handler -> handler.select(context))
)
.defaultIfEmpty(Optional.empty())
.filter(Optional::isPresent)
// return empty if no results are produced
.defaultIfEmpty(Optional.empty())
.blockFirst(Duration.ofMillis(1000));
} catch (final IllegalStateException e) {
log.error("Flux encountered a timeout and was unable to produce a result.");
}
return Optional.empty();
}
}
Here is my test runner to validate this is working as expected:
@RunWith(MockitoJUnitRunner.class)
public class QueryEngineTest {
@Mock
private QueryContext context;
@Test
public void testFirstEligibleHandler_returns() {
final List<QueryHandler> handlers = ImmutableList.of(
new Service1(),
new Service2(),
new Service3());
final QueryEngine queryEngine = new QueryEngine(handlers, Schedulers.boundedElastic());
assertThat(queryEngine.select(context))
.isEqualTo(Optional.of(new QueryResult("1")));
}
}
However, the above code fails because the results from Service2 and Service3 return before the results of Service1. None of the calls timeout but because I'm using blockFirst I don't get to decide which is returned.
Could someone provide a suggestion for how to refactor this to support the priority based results using reactor code?
Solution 1:[1]
You could use firstWithValue that subscribes to all provided publishers and emit the first available value. You would need to handle NoSuchElementException in case no values are emitted.
Note that all other publishers will be canceled and results are ignored.
@Test
void firstWithValue() {
var service1 = Mono.just(1)
.delaySubscription(Duration.ofMillis(2000))
.timeout(Duration.ofMillis(1000), Mono.empty());
var service2 = Mono.just(2)
.delaySubscription(Duration.ofMillis(100))
.timeout(Duration.ofMillis(1000), Mono.empty());
var service3 = Mono.just(3)
.delaySubscription(Duration.ofMillis(200))
.timeout(Duration.ofMillis(1000), Mono.empty());
var res = Flux.firstWithValue(service1, service2, service3)
.onErrorResume(NoSuchElementException.class, e -> Mono.empty());
StepVerifier.create(res)
.expectNext(2)
.verifyComplete();
}
In case you want to wait for all publishers for defined timeout and then decide based on available results, you could use merge in conjunction with timeout.
@Test
void mergeWithTimeout() {
var service1 = Mono.just(1)
.delaySubscription(Duration.ofMillis(2000))
.timeout(Duration.ofMillis(1000), Mono.empty());
var service2 = Mono.just(2)
.delaySubscription(Duration.ofMillis(100))
.timeout(Duration.ofMillis(1000), Mono.empty());
var service3 = Mono.just(3)
.delaySubscription(Duration.ofMillis(200))
.timeout(Duration.ofMillis(1000), Mono.empty());
var res = Flux.merge(service1, service2, service3)
.collectList()
.map(list -> {
// decide which one to take here
return list.get(0);
});
StepVerifier.create(res)
.expectNext(2)
.verifyComplete();
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Alex |
