'How to improve message processing faster in spring boot and MQ app?

Got a small spring-boot messaging app that receives a message from the queue and inserts/updates a row in DB2 table. Noticed this week it received lot of messages but consumption was very slow that msgs filling the disk (infra complained about it). How could we improve the reading of the messages from the queue faster?

JMS Config

...
    ...
    @Bean
    public MQXAConnectionFactory mqxaQueueConnectionFactory() {
        MQXAConnectionFactory mqxaConnectionFactory = new MQXAConnectionFactory();
        log.info("Host: {}", host);
        log.info("Port: {}", port);
        log.info("Channel: {}", channel);
        log.info("Timeout: {}", receiveTimeout);
        try {
            mqxaConnectionFactory.setHostName(host);
            mqxaConnectionFactory.setPort(port);
            mqxaConnectionFactory.setQueueManager(queueManager);
            if (channel != null && !channel.trim().isEmpty()) {
                mqxaConnectionFactory.setChannel(channel);
            }
            mqxaConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
        return mqxaConnectionFactory;
    }

    @Bean
    public CachingConnectionFactory cachingConnectionFactory(MQXAConnectionFactory mqxaConnectionFactory) {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setTargetConnectionFactory(mqxaConnectionFactory);
        cachingConnectionFactory.setSessionCacheSize(this.sessionCacheSize);
        cachingConnectionFactory.setCacheConsumers(this.cacheConsumers);
        cachingConnectionFactory.setReconnectOnException(true);
        return cachingConnectionFactory;
    }

    @Bean
    @Primary
    public SingleConnectionFactory singleConnectionFactory(MQXAConnectionFactory mqxaConnectionFactory) {
        SingleConnectionFactory singleConnectionFactory = new SingleConnectionFactory(mqxaConnectionFactory);
        singleConnectionFactory.setTargetConnectionFactory(mqxaConnectionFactory);
        singleConnectionFactory.setReconnectOnException(true);
        return singleConnectionFactory;
    }

    @Bean
    public PlatformTransactionManager platformTransactionManager(CachingConnectionFactory cachingConnectionFactory) {
        return new JmsTransactionManager(cachingConnectionFactory);
    }

    @Bean
    public JmsOperations jmsOperations(CachingConnectionFactory cachingConnectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
        jmsTemplate.setReceiveTimeout(receiveTimeout);
        return jmsTemplate;
    }

    @Bean
    public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory(PlatformTransactionManager transactionManager,
              @Qualifier("singleConnectionFactory") SingleConnectionFactory singleConnectionFactory) {
        EnhancedJmsListenerContainerFactory factory = new EnhancedJmsListenerContainerFactory();
        factory.setConnectionFactory(singleConnectionFactory);
        factory.setTransactionManager(transactionManager);
        factory.setConcurrency(concurrency);
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        factory.setSessionTransacted(true);
        factory.setMaxMessagesPerTask(this.messagesPerTask);
        factory.setIdleTaskExecutionLimit(this.idleTaskExecutionLimit);
        return factory;
    }
    ...
    ...

We moved from CachingConnectionFactory to SingleConnectionFactory since it opened too many queue connections.

Queue Listener

@Transactional
@Slf4j
@Component
@Profile("!thdtest")
public class QueueListener {
    private CatalogStatusDataProcessor catalogStatusDataProcessor;
    private HostToDestDataAckProcessor hostToDestDataAckProcessor;

    @Autowired
    public QueueListener(CatalogStatusDataProcessor catalogStatusDataProcessor, HostToDestDataAckProcessor hostToDestDataAckProcessor) {
        this.catalogStatusDataProcessor = catalogStatusDataProcessor;
        this.hostToDestDataAckProcessor = hostToDestDataAckProcessor;
    }

    @JmsListener(destination = "${project.mq.queue}", containerFactory = "defaultJmsListenerContainerFactory")
    public void onMessage(MAOMessage message) throws Exception {

        String messageString = message.getStringMessagePayload();
        try {
            if (log.isDebugEnabled())
                log.debug("Message received = "+messageString);
            StopWatch sw = new StopWatch("Received message");
            sw.start();
            Object obj= XMLGenerator.generateTOfromXML(messageString);
            if(obj instanceof ResponseTO){
                catalogStatusDataProcessor.processCatalogStatusInfo((ResponseTO)obj);
            }
            else if(obj instanceof HostToStoreDataAckWrapper){
                hostToDestDataAckProcessor.processHostToDestDataACK(messageString);
            }
            sw.stop();
            log.info("Message is processed in = "+sw.getTotalTimeSeconds() +" seconds");
        } catch (Exception e) {
            log.error("Exception in processing message: {}", e);
            throw e;
        }
    }
}

Tried changing the concurrency settings from 2-4 to 4-6 and it didn't improve much. Using spring-boot 1.5.4.RELEASE, JdK8, javax.jms 2.0.1, MQ allclient 9.0



Solution 1:[1]

How do you know this is an MQ problem, In my experience (MQ performance) most performance problems were due to the other processing for example MQGET(fast) database update(slow).. commit.

I would put some code around the various components, and time them. For example

Get time1 MQGET Get time2

calculate time2 - time 1 if delta > 10 ms report this ( or add it to a global counter)

do database work get time 3 calculate time 3 - time 2

If delta > 10 ms report this (or add it to a global counter).

If this does not report any problems, drop the time from 10ms to 2 ms.

If this does not report any problems, try adding additional instances of your program, as it could be that work is coming in faster than one thread can process it.

I've also seen reducing threads helped! When they did a database insert/update this caused contention and threads were all waiting on the thread holding the lock- you would see this from the long database times.

As a first step you could turn on some MQ traces to report how long the MQ calls take.... but adding the code I mentioned is good practice - especially if they problems is else where.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 colin paice