'RSocket Client Side Load balancing for multilevel Microservices

In our microservices based application we have layer of Microservices i.e User/Rest client calling MS1 and MS1 calling MS2 .. and so on. For simplicity and to present the actual problem I will mention only client, MS1 and MS2 here. We are tyring to implement MS to MS calls using RSocket communication protocol (Request Response Interaction Model). We also need to implement client side load balancing in RSocket as we will be running mulitple PODS(instances) of MSs in kubernetes enviroment. We are observing following problem in client side Load Balancing in local Unit testing also(mentioning this so as to rule out any issue with deployment/kubernetes env. etc)

1.) Client -> MS1(Instance1) -> MS2(Instance1) ( RSocket Load Balancing code is Working fine and each request is processed)

  1. Client -> MS1(Instance1,Intanse2) -> MS2(Instance1) (Load Balancing is working fine)

  2. Client -> MS1(Instance1)->S2(Instance1c,Instance2) (Only 1st request passes i.e Client -> MS1(Instance1 -> MS2(Instance1) and then 2nd Request Client -> MS1(Intance1) (Stops here and call to MS2(Instance2) is not initiated

  3. Clinet -> MS1(Instance1,Instance2) -> MS2(Instance1,Instance2)
    Only 2 request gets successfully processed Client -> MS1(Instance1) -> MS2(Intstance1) and Client -> MS1(Instance2) -> MS2(Instance2) futher the RSocket call does not happen and as per KeepAliveInterval and KeepAliveMaxLifeTime client RSocket connection id disposed with error

Caused by: ConnectionErrorException (0x101): No keep-alive acks for 30000 ms at io.rsocket.core.RSocketRequester.lambda$tryTerminateOnKeepAlive$2(RSocketRequester.java:299)

Now let us see how I have implmented client side Load Balancing code. I am relying on Flux<List and 3 important Beans are

private Mono<List<LoadbalanceTarget>> targets()
    {
        Mono<List<LoadbalanceTarget>> mono = Mono.fromSupplier(()->serviceRegistry.getServerInstances()
                .stream()
                .map(server -> LoadbalanceTarget.from(getLoadBalanceTargetKey(server),
                TcpClientTransport.create(TcpClient
                                .create()
                                .option(ChannelOption.TCP_NODELAY, true)
                                .option(ChannelOption.ALLOW_HALF_CLOSURE,true)
                                .host(server.getHost())
                                .port(server.getPort()))))
                .collect(Collectors.toList()));
        return mono;
@Bean
    public Flux<List<LoadbalanceTarget>> targetFluxForMathService2()
    {
        return Flux.from(targets());
   
    }
Note: for testing I am faking serviceRegistry and returing list of hard coded RSocket server instances( Host  and port)

@Bean
    public RSocketRequester rSocketRequester2(Flux<List<LoadbalanceTarget>> targetFluxForMathService2) {

        RSocketRequester rSocketRequester = this.builder.rsocketConnector(configurer->
                configurer.keepAlive(Duration.ofSeconds(10), Duration.ofSeconds(30))
                        .reconnect(Retry.fixedDelay(3,
                                Duration.ofSeconds(1))
                                .doBeforeRetry(s-> System.out.println("Disconnected, retrying to connect"))))
        .transports(targetFluxForMathService2, new RoundRobinLoadbalanceStrategy());
        return rSocketRequester;
    }

private String getLoadBalanceTargetKey(RSocketServerInstance server)
    {
        return server.getHost() + server.getPort();
    }

Any help will be highly appreciated.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source