'RSocket Client Side Load balancing for multilevel Microservices
In our microservices based application we have layer of Microservices i.e User/Rest client calling MS1 and MS1 calling MS2 .. and so on. For simplicity and to present the actual problem I will mention only client, MS1 and MS2 here. We are tyring to implement MS to MS calls using RSocket communication protocol (Request Response Interaction Model). We also need to implement client side load balancing in RSocket as we will be running mulitple PODS(instances) of MSs in kubernetes enviroment. We are observing following problem in client side Load Balancing in local Unit testing also(mentioning this so as to rule out any issue with deployment/kubernetes env. etc)
1.) Client -> MS1(Instance1) -> MS2(Instance1) ( RSocket Load Balancing code is Working fine and each request is processed)
Client -> MS1(Instance1,Intanse2) -> MS2(Instance1) (Load Balancing is working fine)
Client -> MS1(Instance1)->S2(Instance1c,Instance2) (Only 1st request passes i.e Client -> MS1(Instance1 -> MS2(Instance1) and then 2nd Request Client -> MS1(Intance1) (Stops here and call to MS2(Instance2) is not initiated
Clinet -> MS1(Instance1,Instance2) -> MS2(Instance1,Instance2)
Only 2 request gets successfully processed Client -> MS1(Instance1) -> MS2(Intstance1) and Client -> MS1(Instance2) -> MS2(Instance2) futher the RSocket call does not happen and as per KeepAliveInterval and KeepAliveMaxLifeTime client RSocket connection id disposed with error
Caused by: ConnectionErrorException (0x101): No keep-alive acks for 30000 ms at io.rsocket.core.RSocketRequester.lambda$tryTerminateOnKeepAlive$2(RSocketRequester.java:299)
Now let us see how I have implmented client side Load Balancing code. I am relying on Flux<List and 3 important Beans are
private Mono<List<LoadbalanceTarget>> targets()
{
Mono<List<LoadbalanceTarget>> mono = Mono.fromSupplier(()->serviceRegistry.getServerInstances()
.stream()
.map(server -> LoadbalanceTarget.from(getLoadBalanceTargetKey(server),
TcpClientTransport.create(TcpClient
.create()
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOW_HALF_CLOSURE,true)
.host(server.getHost())
.port(server.getPort()))))
.collect(Collectors.toList()));
return mono;
@Bean
public Flux<List<LoadbalanceTarget>> targetFluxForMathService2()
{
return Flux.from(targets());
}
Note: for testing I am faking serviceRegistry and returing list of hard coded RSocket server instances( Host and port)
@Bean
public RSocketRequester rSocketRequester2(Flux<List<LoadbalanceTarget>> targetFluxForMathService2) {
RSocketRequester rSocketRequester = this.builder.rsocketConnector(configurer->
configurer.keepAlive(Duration.ofSeconds(10), Duration.ofSeconds(30))
.reconnect(Retry.fixedDelay(3,
Duration.ofSeconds(1))
.doBeforeRetry(s-> System.out.println("Disconnected, retrying to connect"))))
.transports(targetFluxForMathService2, new RoundRobinLoadbalanceStrategy());
return rSocketRequester;
}
private String getLoadBalanceTargetKey(RSocketServerInstance server)
{
return server.getHost() + server.getPort();
}
Any help will be highly appreciated.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
