'Broadcast to multiple reactive WebSocketSessions created using springboot webflux is not working

Following is the scenario:

  1. I create a reactor kafka receiver
  2. Data consumed from kafka receiver is published to a WebSocketHanlder
  3. WebSocketHanlder is mapped to a URL using SimpleUrlHandlerMapping
  4. URL pattern is api/v1/ws/{ID} and I expect multiple WebSocketSession to get created based on different ID used in URI which are managed by single WebSocketHanlder, which is actually happening
  5. But when data from kafka receiver is published, only first created WebSocketSession receiveds it and all other WebSocketSessions do not receive the data
  6. I am using spring-boot 2.6.3 with starter-tomcat

How to publish data to all the WebSocketSessions created My Code:

Config for web socket handler


@Configuration
@Slf4j
public class OneSecPollingWebSocketConfig
{
   private OneSecPollingWebSocketHandler oneSecPollingHandler;

   @Autowired
   public OneSecPollingWebSocketConfig(OneSecPollingWebSocketHandler oneSecPollingHandler)
   {
      this.oneSecPollingHandler = oneSecPollingHandler;
   }

   @Bean
   public HandlerMapping webSocketHandlerMapping()
   {
      log.info("onesecpolling websocket configured");
      Map<String, WebSocketHandler> handlerMap = new HashMap<>();
      handlerMap.put(WEB_SOCKET_ENDPOINT, oneSecPollingHandler);
      SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
      mapping.setUrlMap(handlerMap);
      mapping.setOrder(1);
      return mapping;
   }
}

Code for WebSocket HAndler


@Component
@Slf4j
public class OneSecPollingWebSocketHandler implements WebSocketHandler
{
   private ObjectMapper objectMapper;
   private OneSecPollingKafkaConsumerService oneSecPollingKafkaConsumerService;
   private Map<String, WebSocketSession> wsSessionsByUserSessionId = new HashMap<>();

   @Autowired
   public OneSecPollingWebSocketHandler(ObjectMapper objectMapper, OneSecPollingKafkaConsumerService oneSecPollingKafkaConsumerService)
   {
      this.objectMapper = objectMapper;
      this.oneSecPollingKafkaConsumerService = oneSecPollingKafkaConsumerService;
   }

   @Override
   public Mono<Void> handle(WebSocketSession webSocketSession)
   {
      Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
      wsSessionsByUserSessionId.put(getUserPollingSessionId(webSocketSession), webSocketSession);
      sinkSubscription(webSocketSession, sink);
      Mono<Void> output = webSocketSession.send(sink.asFlux().map(webSocketSession::textMessage)).doOnSubscribe(subscription ->
      {
      });
      return Mono.zip(webSocketSession.receive().then(), output).then();
   }

   public void sinkSubscription(WebSocketSession webSocketSession, Many<String> sink)
   {
      log.info("number of sessions; {}", wsSessionsByUserSessionId.size());
      oneSecPollingKafkaConsumerService.getTestTopicFlux().doOnNext(record ->
      {
         //log.info("record: {}", record);
         sink.tryEmitNext(record.value());
         record.receiverOffset().acknowledge();
      }).subscribe();
   }

   public String getOneSecPollingTopicRecord(ReceiverRecord<Integer, String> record, WebSocketSession webSocketSession)
   {
      String lastRecord = record.value();
      log.info("record to send: {} : webSocketSession: {}", record.value(), webSocketSession.getId());
      record.receiverOffset().acknowledge();
      return lastRecord;     
   }

   public String getUserPollingSessionId(WebSocketSession webSocketSession)
   {
      UriTemplate template = new UriTemplate(WEB_SOCKET_ENDPOINT);
      URI uri = webSocketSession.getHandshakeInfo().getUri();
      Map<String, String> parameters = template.match(uri.getPath());
      String userPollingSessionId = parameters.get("userPollingSessionId");
      return userPollingSessionId;
   }
}

Kafka Receiver

@Service
@Slf4j
public class OneSecPollingKafkaConsumerService
{
   private String bootStrapServers;

   @Autowired
   public OneSecPollingKafkaConsumerService(@Value("${bootstrap.servers}") String bootStrapServers)
   {
      this.bootStrapServers = bootStrapServers;
   }

   private ReceiverOptions<Integer, String> getRecceiverOPtions()
   {
      Map<String, Object> consumerProps = new HashMap<>();
      consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
      //consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "reactive-consumer");
      consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "onesecpolling-group");
      consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
      consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      //consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      //consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

      ReceiverOptions<Integer, String> receiverOptions = ReceiverOptions
         .<Integer, String> create(consumerProps)
         .subscription(Collections.singleton("HighFrequencyPollingKPIsComputedValues"));

      return receiverOptions;
   }

   public Flux<ReceiverRecord<Integer, String>> getTestTopicFlux()
   {
      return createTopicCache();
   }

   private Flux<ReceiverRecord<Integer, String>> createTopicCache()
   {
      Flux<ReceiverRecord<Integer, String>> oneSecPollingMessagesFlux = KafkaReceiver.create(getRecceiverOPtions())
         .receive()
         .delayElements(Duration.ofMillis(500));
      return oneSecPollingMessagesFlux;
   }
}

POM dependencies

<dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
    </dependency>
    <!-- 
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-security</artifactId>
    </dependency> 
    -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
      <groupId>io.projectreactor.kafka</groupId>
      <artifactId>reactor-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!-- This is breaking WebFlux 
      <dependency>
        <groupId>org.springdoc</groupId>
        <artifactId>springdoc-openapi-ui</artifactId>
        <version>${springdoc.version}</version>
      </dependency>
      -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-tomcat</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
      <classifier>test-binder</classifier>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <!-- <dependency>
      <groupId>org.springframework.security</groupId>
      <artifactId>spring-security-test</artifactId>
      <scope>test</scope>
    </dependency> -->
  </dependencies>

I also tried changing the handle(...) method definition in WebSocketHanlder to following, but still data from kafka is pushed to only one websocket session:

@Override
   public Mono<Void> handle(WebSocketSession webSocketSession)
   {
      Mono<Void> input = webSocketSession.receive().then();
      Mono<Void> output = webSocketSession.send(oneSecPollingKafkaConsumerService.getTestTopicFlux().map(ReceiverRecord::value).map(webSocketSession::textMessage));
      return Mono.zip(input, output).then();
   }

Also, I tried following:

public Mono<Void> handle(WebSocketSession webSocketSession)
   {      
      Mono<Void> input = webSocketSession.receive()
         .doOnSubscribe(subscribe -> log.info("sesseion created sessionId:{}:userId:{};sessionhash:{}",
            webSocketSession.getId(),
            getUserPollingSessionId(webSocketSession),
            webSocketSession.hashCode()))
         .then();
      Flux<String> source = oneSecPollingKafkaConsumerService.getTestTopicFlux().map(record -> getOneSecPollingTopicRecord(record, webSocketSession)).log();
      Mono<Void> output = webSocketSession.send(source.map(webSocketSession::textMessage)).log();
      return Mono.zip(input, output).then().log();
   }

I enabled log() and got following output:

20:09:22.652 [http-nio-8080-exec-9] INFO  c.m.e.w.p.i.w.v.OneSecPollingWebSocketHandler - sesseion created sessionId:a:userId:124;sessionhash:1974799413
20:09:22.652 [http-nio-8080-exec-9] INFO  reactor.Flux.RefCount.41 - | onSubscribe([Fuseable] FluxRefCount.RefCountInner)
20:09:22.652 [http-nio-8080-exec-9] INFO  reactor.Flux.Map.42 - onSubscribe(FluxMap.MapSubscriber)
20:09:22.652 [http-nio-8080-exec-9] INFO  reactor.Flux.Map.42 - request(1)
20:09:22.652 [http-nio-8080-exec-9] INFO  reactor.Flux.RefCount.41 - | request(32)
20:09:22.659 [http-nio-8080-exec-9] INFO  reactor.Mono.FromPublisher.43 - onSubscribe(MonoNext.NextSubscriber)
20:09:22.659 [http-nio-8080-exec-9] INFO  reactor.Mono.FromPublisher.43 - request(unbounded)
20:09:25.942 [http-nio-8080-exec-10] INFO  reactor.Mono.IgnorePublisher.48 - onSubscribe(MonoIgnoreElements.IgnoreElementsSubscriber)
20:09:25.942 [http-nio-8080-exec-10] INFO  reactor.Mono.IgnorePublisher.48 - request(unbounded)
20:09:25.942 [http-nio-8080-exec-10] INFO  c.m.e.w.p.i.w.v.OneSecPollingWebSocketHandler - sesseion created sessionId:b:userId:123;sessionhash:1582184236
20:09:25.942 [http-nio-8080-exec-10] INFO  reactor.Flux.RefCount.45 - | onSubscribe([Fuseable] FluxRefCount.RefCountInner)
20:09:25.942 [http-nio-8080-exec-10] INFO  reactor.Flux.Map.46 - onSubscribe(FluxMap.MapSubscriber)
20:09:25.942 [http-nio-8080-exec-10] INFO  reactor.Flux.Map.46 - request(1)
20:09:25.942 [http-nio-8080-exec-10] INFO  reactor.Flux.RefCount.45 - | request(32)
20:09:25.947 [http-nio-8080-exec-10] INFO  reactor.Mono.FromPublisher.47 - onSubscribe(MonoNext.NextSubscriber)
20:09:25.949 [http-nio-8080-exec-10] INFO  reactor.Mono.FromPublisher.47 - request(unbounded)
20:10:00.880 [reactive-kafka-onesecpolling-group-11] INFO  reactor.Flux.RefCount.41 - | onNext(ConsumerRecord(topic = HighFrequencyPollingKPIsComputedValues, partition = 0, leaderEpoch = null, offset = 474, CreateTime = 1644071999871, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"greeting" : "Hello", "name" : "Prashant"}))
20:10:01.387 [parallel-5] INFO  reactor.Flux.Map.42 - onNext({"greeting" : "Hello", "name" : "Prashant"})
20:10:01.389 [parallel-5] INFO  reactor.Flux.Map.42 - request(1)

Here we can see that we have 2 subscribers to reactor-kafka flux:

  1. reactor.Flux.Map.42 - onSubscribe(FluxMap.MapSubscriber
  2. reactor.Flux.Map.46 - onSubscribe(FluxMap.MapSubscriber)

but when data is read from kafka topic, it is received by only one subscriber:

  • reactor.Flux.Map.42 - onNext({"greeting" : "Hello", "name" : "Prashant"})

Is it a bug in the Webflux API itself ?



Solution 1:[1]

I have found the issue and the solution.

Problem The way I was using Flux (obtained from KafkaReceiver) in WebSocketHandler handle() method is not correct. For each websocket session created from multiple client requests, handle method is get called. And so, multiple Flux objects for KafkaReceiver.create().receive() are created. One of the Flux reads data from KafkaReceiver but other flux objects failed to do so.

public Mono<Void> handle(WebSocketSession webSocketSession)
   {      
      Mono<Void> input = webSocketSession.receive()
         .doOnSubscribe(subscribe -> log.info("sesseion created sessionId:{}:userId:{};sessionhash:{}",
            webSocketSession.getId(),
            getUserPollingSessionId(webSocketSession),
            webSocketSession.hashCode()))
         .then();
      **Flux<String> source = oneSecPollingKafkaConsumerService.getTestTopicFlux()**.map(record -> getOneSecPollingTopicRecord(record, webSocketSession)).log();
      Mono<Void> output = webSocketSession.send(source.map(webSocketSession::textMessage)).log();
      return Mono.zip(input, output).then().log();
   }

Solution Make sure that only one Flux is created for KafkaReceiver.create().receive(). One way to do so is to make Flux in the constructor of WebSocketHandler (or KAfkaCOnsumer class)

private final Flux<String> source;

   @Autowired
   public OneSecPollingWebSocketHandler(OneSecPollingKafkaConsumerService oneSecPollingKafkaConsumerService)
   {
      source = oneSecPollingKafkaConsumerService.getOneSecPollingTopicFlux().map(r -> getOneSecPollingTopicRecord(r));
   }

   @Override
   public Mono<Void> handle(WebSocketSession webSocketSession)
   {
      // add usersession id as session attribute
      Mono<Void> input = getInputMessageMono(webSocketSession);
      Mono<Void> output = getOutputMessageMono(webSocketSession);
      return Mono.zip(input, output).then().log();
   }

   private Mono<Void> getOutputMessageMono(WebSocketSession webSocketSession)
   {
      Mono<Void> output = webSocketSession.send(source.map(webSocketSession::textMessage)).doOnError(err -> log.error(err.getMessage())).doOnTerminate(() ->
      {
         log.info("onesecpolling session terminated;{}", webSocketSession.getId());
      }).log();
      return output;
   }

   private Mono<Void> getInputMessageMono(WebSocketSession webSocketSession)
   {
      Mono<Void> input = webSocketSession.receive().doOnSubscribe(subscribe ->
      {
         log.info("onesecpolling session created sessionId:{}:userId:{}", webSocketSession.getId(), getUserPollingSessionId(webSocketSession));
      }).then();
      return input;
   }

   private String getOneSecPollingTopicRecord(ReceiverRecord<Integer, String> record)
   {
      String lastRecord = record.value();
      record.receiverOffset().acknowledge();
      return lastRecord;
   }

   private String getUserPollingSessionId(WebSocketSession webSocketSession)
   {
      UriTemplate template = new UriTemplate(WEB_SOCKET_ENDPOINT);
      URI uri = webSocketSession.getHandshakeInfo().getUri();
      Map<String, String> parameters = template.match(uri.getPath());
      String userPollingSessionId = parameters.get(WEB_SOCKET_ENDPOINT_USERID_SUBPATH);
      return userPollingSessionId;
   }

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 pfulara