'Send 35000 jms messages per minute
We have a spring boot application for performing load test on one other component. We need to send max 35000 JMS messages per minute and for that reason I am using a scheduler for running a task every one minute.
The problem is when I keep the intensity low it manages to send the messages within the specified time interval (one minute). But when the intensity is high it takes more than 1 minute to send the chunk of messages. Any suggestions on the below implementation?
Scheduler class
@Component
public class MessageScheduler {
private final Logger log = LoggerFactory.getLogger(getClass());
private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(16);
private final static int TIME_PERIOD = ConfigFactory.getConfig().getInt("messages.period").orElse(60000);
@Autowired
JmsSender sender;
public void startScheduler() {
Runnable runnableTask = sender::sendMessagesChunk;
executorService.scheduleAtFixedRate(runnableTask, 0, TIME_PERIOD,
TimeUnit.MILLISECONDS);
}
}
Class for sending the messages
@Component
public class JmsSender {
@Autowired
TrackingManager manager;
private final Logger log = LoggerFactory.getLogger(getClass());
private final static int TOTAL_MESSAGES = ConfigFactory.getConfig().getInt("total.tracking.messages").orElse(10);
private final static int TIME_PERIOD = ConfigFactory.getConfig().getInt("messages.period").orElse(60000);
private static int failedPerPeriod=0;
private static int totalFailed=0;
private static int totalMessageCounter=0;
public void sendMessagesChunk() {
log.info("Started at: {}", Instant.now());
log.info("Sending messages with intensity {} messages/minute", TOTAL_MESSAGES);
for (int i=0; i<TOTAL_MESSAGES; i++) {
try {
long start = System.currentTimeMillis();
MessageDTO msg = manager.createMessage();
send(msg);
long stop = System.currentTimeMillis();
if (timeOfDelay(stop-start)>=0L) {
Thread.sleep(timeOfDelay(stop-start));
}
} catch (Exception e) {
log.info("Error : " + e.getMessage());
failedPerPeriod++;
}
}
totalMessageCounter += TOTAL_MESSAGES;
totalFailed += failedPerPeriod;
log.info("Finished at: {}", Instant.now());
log.info("Success rate(of last minute): {} %, Succeeded: {}, Failed: {}, Success rate(in total): {} %, Succeeded: {}, Failed: {}"
,getSuccessRatePerPeriod(), getSuccededPerPeriod(), failedPerPeriod,
getTotalSuccessRate(), getTotalSucceded(), totalFailed);
failedPerPeriod =0;
}
private long timeOfDelay(Long elapsedTime){
return (TIME_PERIOD / TOTAL_MESSAGES) - elapsedTime;
}
private int getSuccededPerPeriod(){
return TOTAL_MESSAGES - failedPerPeriod;
}
private int getTotalSucceded(){
return totalMessageCounter - totalFailed;
}
private double getSuccessRatePerPeriod(){
return getSuccededPerPeriod()*100D / TOTAL_MESSAGES;
}
private double getTotalSuccessRate(){
return getTotalSucceded()*100D / totalMessageCounter;
}
private void send(MessageDTO messageDTO) throws Exception {
requestContextInitializator();
JmsClient client = JmsClientBuilder.newClient(UriScheme.JmsType.AMQ);
client.target(new URI("activemq:queue:" + messageDTO.getDestination()))
.msgTypeVersion(messageDTO.getMsgType(), messageDTO.getVersion())
.header(Header.MSG_VERSION, messageDTO.getVersion())
.header(Header.MSG_TYPE, messageDTO.getMsgType())
.header(Header.TRACKING_ID, UUID.randomUUID().toString())
.header(Header.CLIENT_ID, "TrackingJmsClient")
.post(messageDTO.getPayload());
}
Solution 1:[1]
You should solve two problems:
- total send operation time must be under max time.
- messages should be sent not as fast as possible, instead, they should be sent uniformly along all available time.
Obviously, if your send method is too slow, the max time will be exceeded.
The faster way to send messages is to use some sort of bulk operation. Never mind if your MQ API don't support bulk operation, you can't use it! because of the second restriction ("uniformly").
You can send messages asynchronously, but if your MQ API create threads for that instead of "non-blocking" async, you could have memory problems.
Using javax.jms.MessageProducer.send you can send messages asynchronously, but a new one thread will be created for each one (a lot of memory and server threads will be created).
Another speedup could be create only one JMS client (your send method).
To achieve the second requirement, you should fix your timeOfDelay function, it's wrong. Really, you should take in account the probability distribution of the send function to estimate the proper value but, you can simply do:
long accTime = 0L;
for (int i=0; i<TOTAL_MESSAGES; i++) {
try {
long start = System.currentTimeMillis();
MessageDTO msg = manager.createMessage();
send(msg);
long stop = System.currentTimeMillis();
accTime += stop - start;
if(accTime < TIME_PERIOD)
Thread.sleep((TIME_PERIOD - accTime) / (TOTAL_MESSAGES - i));
} catch (Exception e) {
log.info("Error : " + e.getMessage());
failedPerPeriod++;
}
}
Solution 2:[2]
35000 msg/min is a notch below 600 msg/sec. That is not considered "a lot" and should be relatively easy goal to clear. Primary idea is to "reuse" all heavy weight JMS objects: connection, session and destination. Single thread should be enough.
ConnectionFactory connFactory = .... // initialize connection factory
@Cleanup Connection conn = connFactory.createConnection();
@Cleanup Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
Queue q = session.createQueue("example_destiation");
@Cleanup MessageProducer producer = session.createProducer(q);
for (String payload: messagesToSend) {
TextMessage message = session.createTextMessage(payload);
producer.send(msg);
session.commit();
}
Additional speedups are possible by:
- commiting every n-th message
- by using faster ACKNOWLEDGE modes
- by using non-persistent messages
- by using destination object created outside session
- sending messages asynchronously
Example of NON_PERSISTENT, ACKOWLEDGE, ASYNC delivery:
@Cleanup Connection conn = connFactory.createConnection();
@Cleanup Session session = conn.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue q = session.createQueue("example_destiation");
@Cleanup MessageProducer producer = session.createProducer(q);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.setAsync(new ExmpleSendListener());
for (String payload: messagesToSend) {
TextMessage message = session.createTextMessage(payload);
producer.send(msg);
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | josejuan |
| Solution 2 |
