'GCP PubSub java client's Publisher - concurrency not working Multi threaded environement

I am trying to consume messages from Kafka and publish them to Google PubSub. We have 4 Kafka concurrent consumer threads and I injected the Google pubsub client's Publisher. But the Publisher is not working concurrently and becomes a single threaded.

But with the same publisher settings if I run a program outside kafka consumer, I am able publish more than 3000 message per sec.

Here is my Publisher Bean Injected to Kafka consumer. I am using default batchSettings and setting number of threads as 10. I played with different number, but for Kafka thread seems to be forcing Publisher to use just one thread.

Publisher bean code:


    ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder()
                .setExecutorThreadCount(10)
                .build();
    
    Publisher publisher = Publisher.newBuilder(TopicName.of(projectId, topicId))
                .setExecutorProvider(executorProvider)
                .build();

My Kafka Consumer:

 

  public class MyKafkaListenerForPubSub {
      
    private final Publisher publisher;
    
      public MyKafkaListenerForPubSub(Publisher publisher) {
        this.publisher = publisher;
      }
    
      @KafkaListener(topics = "topicName", containerFactory = "kafkaListenerContainerFactory")
      public void processMessage(String content) throws ExecutionException, InterruptedException {
          ByteString messageData = ByteString.copyFromUtf8(content);
          PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(messageData).build();
          ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      }
    }

I tried with my own executor service but It did the same.

I am using the below google pubsub library in a non spring boot app but it uses spring core.

<dependency>
   <groupId>com.google.cloud</groupId>
   <artifactId>google-cloud-pubsub</artifactId>
   <version>1.116.3</version>
</dependency>

Here is a simple program to replicate the issue. If we use the Publisher in the executor service it is not able to push the messages.


import com.google.api.core.ApiFuture;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;

import java.io.ByteArrayInputStream;
import java.util.Base64;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

public class GCPPublishExample {
  private static String b64Val = "GCP CREDS BASE 64 value";
  private static String bigMessage = "Some Big Message of size 30 KB";
  private static String projectId = "gcp project Id";
  private static String topicId = "some-topic";

  private static void addShutdownHook(){
    Runtime.getRuntime().addShutdownHook(new Thread()
    {
      public void run()
      {
        System.out.println("Program Completed at: " + new Date());
      }
    });
  }

  public static void main(String[] args) throws Exception {
    addShutdownHook();
    System.out.println("Program Started at: " + new Date());

    final boolean parallel = true;  //false to test without threads
    final int totalRecordsToSend = 100_000; //We should change this number to big if we need
    final int executorSize = 4; //As we have 4 kafka concurrent consumers

    System.out.println("parallel: " + parallel + " totalRecordsToSend: " + totalRecordsToSend + " executorSize: " + executorSize);

    GoogleCredentials creds = GoogleCredentials.fromStream(new ByteArrayInputStream(Base64.getDecoder().decode(b64Val)));

    //Using Default batchingSettings and Default executorProvider
    final Publisher publisher = Publisher.newBuilder(TopicName.of(projectId, topicId))
            .setCredentialsProvider(FixedCredentialsProvider.create(creds))
            .build();

    if(parallel) {
      System.out.println("= PUBLISH IN PARALLEL =");
      ExecutorService executorService =  Executors.newFixedThreadPool(executorSize);
      
      IntStream.range(0, totalRecordsToSend).
              forEach(i -> {executorService.execute(new MyGCPPublishRunnable(publisher, bigMessage));});

      executorService.shutdown();
    }
    else {
      System.out.println("= RUNNING IN SEQUENCE =");
      IntStream.range(0, totalRecordsToSend).
              forEach(i -> {publishMessage(publisher, bigMessage);});
    }
  }

  private static void publishMessage(Publisher publisher, String message) {
    try {
      ByteString messageData = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(messageData).build();
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      //System.out.println("Message sent successfully, messageId: {}" + messageIdFuture.get() + " Thread: " + Thread.currentThread().getId());
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

class MyGCPPublishTask implements Runnable {
  private final Publisher publisher;
  private final String message;

  public MyGCPPublishTask(Publisher publisher, String message) {
    this.publisher = publisher;
    this.message = message;
  }

  @Override
  public void run() {
    try {
      ByteString messageData = ByteString.copyFromUtf8(this.message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(messageData).build();
      ApiFuture<String> messageIdFuture = this.publisher.publish(pubsubMessage);
      //System.out.println("Message sent successfully, messageId: " + messageIdFuture.get());
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}


Solution 1:[1]

I had debug statement to log the messageId from GCP that made performance issues as it was waiting for the GCP to response back. After I comment that it worked fine.

LOGGER.debug("Message sent successfully, messageId: {}", messageIdFuture.get());

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 jana reddy