'"Consumer Closed" when acknowledging message with JmsTemplate receive method

I have an activemq queue where messages are being sent to and the application uses Spring JmsTemplate receiveSelected(selector) to receive the messages synchronously. Message is processed before it is acknowledged. If the broker or the application shuts down before acknowledging the message while it is being processed the message needs to be resent or redelivered without getting lost. My understanding is with client_acknowledgement messages gets resent if not acknowledged also.

Configuration

<bean id="jmsConnection" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="broker" value="tcp://localhost:61616" />
    <property name="redeliveryPolicy" ref="redeliveryPolicy" />
</bean>

Redelivery Policy:

<bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    <property name="initialRedeliveryDelay" value="1000" />
    <property name="redeliveryDelay" value="10000" />
    <property name="maximumRedeliveries" value="2" />
    <property name="useExponentialBackOff" value="true" />
    <property name="backOffMultiplier" value="5" />
</bean>

JmsTemplate:

<bean id="jmstemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory">
        <bean class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="jmsConnection" />
        </bean>
    </property>
    <property name="receiveTimeout" value="5000"/>
    <property name="defaultDestinationName" value="messageQueue"/>
    <property name="explicitQosEnabled" value="true"/>
    <property name="sessionAcknowledgeMode" value="2"/>
    <property name="sessionTransacted" value="false"/>
</bean>

Session call back to receive message:

Object getProcessedMessageObject {
   return jmsTemplate.execute(session -> {
      @Override
      public Object doInJms (Session session) throws JMSException {
         Object tmp = null;
         MapMessage msg = (MapMessage) jmsTemplate.receiveSelected(selector);
         try {
            if (msg != null) {
               MapMessage receivedMsg = msg;
               tmp = processMsg(receivedMsg) if (tmp != null) {
                  msg.acknowledge();
               }
            }
         } catch (JMSException) {
            throw new RuntimeException();
         } return tmp;
      }
   });
}

I am getting "Consumer is closed" when msg.acknowledge() is called. When I stop and restart my application the messages are not redelivered as they are not acknowledged. I'm trying to understand what I am missing and how to make it work.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source