'GCP PubSub java client's Publisher - concurrency not working Multi threaded environement
I am trying to consume messages from Kafka and publish them to Google PubSub. We have 4 Kafka concurrent consumer threads and I injected the Google pubsub client's Publisher. But the Publisher is not working concurrently and becomes a single threaded.
But with the same publisher settings if I run a program outside kafka consumer, I am able publish more than 3000 message per sec.
Here is my Publisher Bean Injected to Kafka consumer. I am using default batchSettings and setting number of threads as 10. I played with different number, but for Kafka thread seems to be forcing Publisher to use just one thread.
Publisher bean code:
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(10)
.build();
Publisher publisher = Publisher.newBuilder(TopicName.of(projectId, topicId))
.setExecutorProvider(executorProvider)
.build();
My Kafka Consumer:
public class MyKafkaListenerForPubSub {
private final Publisher publisher;
public MyKafkaListenerForPubSub(Publisher publisher) {
this.publisher = publisher;
}
@KafkaListener(topics = "topicName", containerFactory = "kafkaListenerContainerFactory")
public void processMessage(String content) throws ExecutionException, InterruptedException {
ByteString messageData = ByteString.copyFromUtf8(content);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(messageData).build();
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
}
}
I tried with my own executor service but It did the same.
I am using the below google pubsub library in a non spring boot app but it uses spring core.
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<version>1.116.3</version>
</dependency>
Here is a simple program to replicate the issue. If we use the Publisher in the executor service it is not able to push the messages.
import com.google.api.core.ApiFuture;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.ByteArrayInputStream;
import java.util.Base64;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
public class GCPPublishExample {
private static String b64Val = "GCP CREDS BASE 64 value";
private static String bigMessage = "Some Big Message of size 30 KB";
private static String projectId = "gcp project Id";
private static String topicId = "some-topic";
private static void addShutdownHook(){
Runtime.getRuntime().addShutdownHook(new Thread()
{
public void run()
{
System.out.println("Program Completed at: " + new Date());
}
});
}
public static void main(String[] args) throws Exception {
addShutdownHook();
System.out.println("Program Started at: " + new Date());
final boolean parallel = true; //false to test without threads
final int totalRecordsToSend = 100_000; //We should change this number to big if we need
final int executorSize = 4; //As we have 4 kafka concurrent consumers
System.out.println("parallel: " + parallel + " totalRecordsToSend: " + totalRecordsToSend + " executorSize: " + executorSize);
GoogleCredentials creds = GoogleCredentials.fromStream(new ByteArrayInputStream(Base64.getDecoder().decode(b64Val)));
//Using Default batchingSettings and Default executorProvider
final Publisher publisher = Publisher.newBuilder(TopicName.of(projectId, topicId))
.setCredentialsProvider(FixedCredentialsProvider.create(creds))
.build();
if(parallel) {
System.out.println("= PUBLISH IN PARALLEL =");
ExecutorService executorService = Executors.newFixedThreadPool(executorSize);
IntStream.range(0, totalRecordsToSend).
forEach(i -> {executorService.execute(new MyGCPPublishRunnable(publisher, bigMessage));});
executorService.shutdown();
}
else {
System.out.println("= RUNNING IN SEQUENCE =");
IntStream.range(0, totalRecordsToSend).
forEach(i -> {publishMessage(publisher, bigMessage);});
}
}
private static void publishMessage(Publisher publisher, String message) {
try {
ByteString messageData = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(messageData).build();
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
//System.out.println("Message sent successfully, messageId: {}" + messageIdFuture.get() + " Thread: " + Thread.currentThread().getId());
} catch (Exception e) {
e.printStackTrace();
}
}
}
class MyGCPPublishTask implements Runnable {
private final Publisher publisher;
private final String message;
public MyGCPPublishTask(Publisher publisher, String message) {
this.publisher = publisher;
this.message = message;
}
@Override
public void run() {
try {
ByteString messageData = ByteString.copyFromUtf8(this.message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(messageData).build();
ApiFuture<String> messageIdFuture = this.publisher.publish(pubsubMessage);
//System.out.println("Message sent successfully, messageId: " + messageIdFuture.get());
} catch (Exception e) {
e.printStackTrace();
}
}
}
Solution 1:[1]
I had debug statement to log the messageId from GCP that made performance issues as it was waiting for the GCP to response back. After I comment that it worked fine.
LOGGER.debug("Message sent successfully, messageId: {}", messageIdFuture.get());
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | jana reddy |