'Reactive Spring Integration equivalent to Project Reactor .doOnNext()
As a user of Project Reactor that also wants to use Spring Integration, I'd like to perform the following operation, that will work in such way that:
flux.buffer(duration)
.doOnNext(bulkWriteToCockroach())
.doOnNext(bulkWriteToPulsar());
In the beginning, I thought the solution is to do stuff like the following wrong code:
IntegrationFlow.from(myflow)
.aggregate(myTimeBasedAggregation())
.handle(bulkWriteToCockroach())
.handle(bulkWriteToPulsar());
Of course, it wouldn't work due to outputChannel issues. I would like to know how can I perform operations one after another (e.g. don't continue to the Pulsar writes until the CockroachDB writes are completed, and stop the flow for these messages if the first operation fails).
I'm thinking of using the Spring Integration transactions support but I'm afraid of its usage with Reactor.
I've also seen that there is something called gateway(), but I've couldn't manage to find a usable example of a JavaDSL based usage of it.
Solution 1:[1]
To send the same message to different endpoints, you should consider to use a PublishSubscribeChannel, which really don't send to the next subscriber if the previous fails.
See its JavaDocs:
/**
* Specify whether failures for one or more of the handlers should be
* ignored. By default this is false meaning that an Exception
* will be thrown whenever a handler fails. To override this and suppress
* Exceptions, set the value to true.
* @param ignoreFailures true if failures should be ignored.
*/
public void setIgnoreFailures(boolean ignoreFailures) {
With Java DSL you should look into a:
/**
* The {@link org.springframework.integration.channel.PublishSubscribeChannel} {@link #channel}
* method specific implementation to allow the use of the 'subflow' subscriber capability.
* @param publishSubscribeChannelConfigurer the {@link Consumer} to specify
* {@link PublishSubscribeSpec} options including 'subflow' definition.
* @return the current {@link BaseIntegrationFlowDefinition}.
*/
public B publishSubscribeChannel(Consumer<PublishSubscribeSpec> publishSubscribeChannelConfigurer) {
The sample for which could look like this:
.publishSubscribeChannel(s -> s
.subscribe(f -> f
.handle(bulkWriteToCockroach()))
.subscribe(f -> f
.handle(bulkWriteToPulsar())));
See more in the docs: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-subflows
UPDATE
The gateway() is explained here in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-gateway
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
