1 2 Previous Next 23 Replies Latest reply on Feb 10, 2012 3:15 AM by ataylor

    Messages redelivered after committed transaction

    martin-gbg

      Hello,

       

      Wondering if anyone has seen the following problem.

       

      We are using HornetQ version 2.2.5.Final. We have a pool of MDBs in a weblogic app server listening on a queue. After a loadtest we are seeing that while the MDBs are consuming the messages and commits the transaction, messages are still marked for redelivery.

       

      All help is appreciated.

        • 1. Re: Messages redelivered after committed transaction
          ataylor

          what do you mean by marked for redelivery? is the tx set as rollback only, or is the message put back on the queue? Also how are you configuring the resource adapter so it can locate the weblogic transaction manager?

          • 2. Re: Messages redelivered after committed transaction
            mrpi

            Our MDB is configured with the following annotation:

            @MessageDriven(activationConfig = {
                    @ActivationConfigProperty(propertyName = "destination", propertyValue = "jms/TheQ"),
                    @ActivationConfigProperty(propertyName = "destination-jndi-name", propertyValue = "jms/TheQ"),
                    @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
                    @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                    @ActivationConfigProperty(propertyName = "connection-factory-jndi-name", propertyValue = "jms/HornetQConnectionFactory") }, mappedName = "jms/TheQ")
            @TransactionManagement(TransactionManagementType.BEAN)
            

             

            To get a hold of the transaction we're using this:

             

            @Resource
            private UserTransaction utx;
            

             

            In the onMessage method we do the following (pseudo code):

             

            public void onMessage(final Message message) {
                try {
                    utx.begin();
                    processMessage(message);
                    utx.commit();
                } catch (Exception e) {
                    utx.rollback();
                }
            }
            

             

            To get the MDB to connect to the HornetQ queue we use a JCA adapter configured with the following:

             

            weblogic-ra.xml:

             

            <weblogic-connector xmlns="http://www.bea.com/ns/weblogic/weblogic-connector" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.bea.com/ns/weblogic/weblogic-connector http://www.bea.com/ns/weblogic/weblogic-connector/1.0/weblogic-connector.xsd">
              <native-libdir></native-libdir>
              <jndi-name>jms/ResourceAdapter</jndi-name>
              <enable-access-outside-app>true</enable-access-outside-app>
              <enable-global-access-to-classes>true</enable-global-access-to-classes>
              <outbound-resource-adapter>
                <connection-definition-group>
                  <connection-factory-interface>org.hornetq.ra.HornetQRAConnectionFactory</connection-factory-interface>
                  <connection-instance>
                    <jndi-name>jms/HornetQConnectionFactory</jndi-name>
                  </connection-instance>
                </connection-definition-group>
              </outbound-resource-adapter>
            </weblogic-connector>
            

             

            ra.xml:

             

            <?xml version="1.0" encoding="UTF-8"?>
            
                <!-- $Id: ra.xml 76819 2008-08-08 11:04:20Z jesper.pedersen $ -->
            
            <connector xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee
                       http://java.sun.com/xml/ns/j2ee/connector_1_5.xsd"
                version="1.5">
            
                <description>HornetQ 2.0 Resource Adapter</description>
                <display-name>HornetQ 2.0 Resource Adapter</display-name>
            
                <vendor-name>Red Hat Middleware LLC</vendor-name>
                <eis-type>JMS 1.1 Server</eis-type>
                <resourceadapter-version>1.0</resourceadapter-version>
            
                <license>
                    <description>
                    Copyright 2009 Red Hat, Inc.
                     Red Hat licenses this file to you under the Apache License, version 2.0 (the "License"); you may not use this file except in compliance
                     with the License.  You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
                     applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
                     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language governing
                     permissions and limitations under the License.  
                  </description>
                    <license-required>true</license-required>
                </license>
            
                <resourceadapter>
                    <resourceadapter-class>org.hornetq.ra.HornetQResourceAdapter</resourceadapter-class>
                    <config-property>
                        <description>
                        The transport type. Multiple connectors can be configured by using a comma separated list,
                        i.e. org.hornetq.core.remoting.impl.invm.InVMConnectorFactory,org.hornetq.core.remoting.impl.invm.InVMConnectorFactory.
                        </description>
                        <config-property-name>ConnectorClassName</config-property-name>
                        <config-property-type>java.lang.String</config-property-type>
                        <config-property-value>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</config-property-value>
                    </config-property>
            
            
                    <config-property>
                        <config-property-name>ConnectionParameters</config-property-name>
                        <config-property-type>java.lang.String</config-property-type>
                        <config-property-value>host=localhost;port=5445</config-property-value>
                    </config-property>
            
                    <config-property>
                        <description>The load balancing policy class name</description>
                        <config-property-name>LoadBalancingPolicyClassName</config-property-name>
                        <config-property-type>java.lang.String</config-property-type>
                        <config-property-value>org.hornetq.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy</config-property-value>
                    </config-property>
            
                    <outbound-resourceadapter>
                        <connection-definition>
                            <managedconnectionfactory-class>org.hornetq.ra.HornetQRAManagedConnectionFactory</managedconnectionfactory-class>
            
                            <config-property>
                                <description>The default session type</description>
                                <config-property-name>SessionDefaultType</config-property-name>
                                <config-property-type>java.lang.String</config-property-type>
                                <config-property-value>javax.jms.Queue</config-property-value>
                            </config-property>
                            <config-property>
                                <description>Try to obtain a lock within specified number of seconds; less than or equal to 0 disable this functionality</description>
                                <config-property-name>UseTryLock</config-property-name>
                                <config-property-type>java.lang.Integer</config-property-type>
                                <config-property-value>0</config-property-value>
                            </config-property>
            
                            <connectionfactory-interface>org.hornetq.ra.HornetQRAConnectionFactory</connectionfactory-interface>
                            <connectionfactory-impl-class>org.hornetq.ra.HornetQRAConnectionFactoryImpl</connectionfactory-impl-class>
                            <connection-interface>javax.jms.Session</connection-interface>
                            <connection-impl-class>org.hornetq.ra.HornetQRASession</connection-impl-class>
                        </connection-definition>
                        <transaction-support>XATransaction</transaction-support>
                        <authentication-mechanism>
                            <authentication-mechanism-type>BasicPassword</authentication-mechanism-type>
                            <credential-interface>javax.resource.spi.security.PasswordCredential</credential-interface>
                        </authentication-mechanism>
                        <reauthentication-support>false</reauthentication-support>
                    </outbound-resourceadapter>
            
                    <inbound-resourceadapter>
                        <messageadapter>
                            <messagelistener>
                                <messagelistener-type>javax.jms.MessageListener</messagelistener-type>
                                <activationspec>
                                    <activationspec-class>org.hornetq.ra.inflow.HornetQActivationSpec</activationspec-class>
                                    <required-config-property>
                                        <config-property-name>destination</config-property-name>
                                    </required-config-property>
                                </activationspec>
                            </messagelistener>
                        </messageadapter>
                    </inbound-resourceadapter>
            
                </resourceadapter>
            </connector>
            

             

            As you can se, our MDB is deployed in a Weblogic Application Server.

            The queue itself (named TheQ in this example) is configured to have a redelivery delay of 60000 (60 seconds = 1 minute) with "max delivery attempts" set to 10. Our messages will be put on the queue and processed by 15 consumers (15 instances of our MDB).

             

            When a message is put on the queue the following happens:

            1. Message is put on the TheQ-queue

            2. MDB will be called and processes the message

            3. MDB will commit the user transaction

            4. The message will still be on TheQ-queue

            5. After 1 minute the MDB will be called again with the same message

            6. Repeat 2-5 10 times.

             

            Any thoughts?

            • 3. Re: Messages redelivered after committed transaction
              ataylor

              It sounds to me as if altho the Resource Adapter is using XA the weblogic transaction manager is not being enlisted with the transaction manager, this should be done via weblogic. Any chance you can debug this to see, its not really something I'm set up to do. The code you need to look at is setup() on HornetQMessageHandler, you will see the code where this ahppens:

               

              MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();

                    useLocalTx = !activation.isDeliveryTransacted() && activation.getActivationSpec().isUseLocalTx();

                    transacted = activation.isDeliveryTransacted();

               

              Also bear in mind that if the resource adapter cannot access the Transaction Manager there may be different timeouts set on the tx used and the RA xaresource, you set this via the setTransactionManagerLocatorClass and setTransactionManagerLocatorMethod on the RA itself.

               

              Also debug ClientSessionImpl xa methods to see if they are actually called, start, end, commit etc

               

              Let me know how you get on

              • 4. Re: Messages redelivered after committed transaction
                mrpi

                Thanks for the fast response! That helped us a bit on the way.

                 

                When I looked at the code more closely I saw that we are using a method called "rollbackTransaction()" that is used like this:

                 

                public void handleMessage(final Message msg) {
                    final String textMessage = getMessage(msg);
                    try {
                        beginTransaction();
                        process(textMessage);
                        commitTransaction();
                    } catch (final Exception e) {
                        notifyException("Error while processing.", e);
                        rollbackTransaction();
                    }
                }
                

                 

                The method "rollbackTransaction()" actually looked like this:

                 

                public void rollbackTransaction() {
                    try {
                        utx.rollback();
                    } catch (final IllegalStateException e) {
                        throw new RuntimeException("IllegalStateException exception occured while "
                                + "rolling back transaction, JMS message was not received!", e);
                    } catch (final SecurityException e) {
                        throw new RuntimeException("SecurityException exception occured while "
                                + "rolling back transaction, JMS message was not received!", e);
                    } catch (final SystemException e) {
                        throw new RuntimeException("SystemException exception occured while "
                                + "rolling back transaction, JMS message was not received!", e);
                    }
                    throw new RuntimeException("Force redelivery of message according to Delivery Failure settings.");
                }
                

                 

                This means we get a lot of strange behavior. I commented the last line out and that changed everything!

                When the "rollbackTransaction()"-method no longer throw an exception, all messages are acknowledged and removed from the queue. Even if they are rolled back or not.

                 

                I downloaded the source to HornetQ and put a couple of brakepoints in HornetQMessageHandler and ClientSessionImpl and this is what I found:

                • At container startup the following happens:
                  • setup() is called on HornetQMessageHandler a bunch of times (I guess one time for every MDB that is set up?)
                  • start() is called on ClientSessionImpl a bunch of times (same as above?)
                  • HornetQMessageHandler will get both "useLocalTx" and "transacted" set to false
                • When a message is put on the queue the following happens:
                  • onMessage() on HornetQMessageHandler is called
                  • acknowledge() on ClientSessionImpl is called regardless if the transaction was rolled back or not
                • When the container shuts down the following happens:
                  • teardown() on HornetQMessageHandler is called a bunch of times
                  • stop() on ClientSessionImpl is called a bunch of times

                 

                Nothing ever happens in ClientSessionImpl's commit() or rollback()-methods.

                 

                What do you recommend? Should we change our configuration somehow to make useLocalTx and/or transacted to be set to true? How do we do this?

                • 5. Re: Messages redelivered after committed transaction
                  ataylor

                  hmmm,  so you are using user transactions instead of container managed transactions?  the proper way to do this is to set the tx to rollback only rather than throw an exception.

                   

                  setup() is called on HornetQMessageHandler a bunch of times (I guess one time for every MDB that is set up?)

                  once for each session actually, 15 by derfault i think (which matches jboss's mdb pool size)

                  • 6. Re: Messages redelivered after committed transaction
                    ataylor

                    What do you recommend? Should we change our configuration somehow to make useLocalTx and/or transacted to be set to true? How do we do this?

                    if (activation.isDeliveryTransacted() && !activation.getActivationSpec().isUseLocalTx())

                          {

                             endpoint = endpointFactory.createEndpoint(session);

                             useXA = true;

                          }

                     

                    this must be true if you want the session included in the transaction

                    • 7. Re: Messages redelivered after committed transaction
                      mrpi

                      Okay, and where do I change the values of activation.isDeliveryTransacted() and activation.getActivationSpec().isUseLocalTx()?

                      In ra.xml?

                      • 8. Re: Messages redelivered after committed transaction
                        ataylor

                        activation.isDeliveryTransacted() is controlled by the MDB layer via the following i think:

                         

                        @TransactionManagement(value = TransactionManagementType.CONTAINER)

                        @TransactionAttribute(value = TransactionAttributeType.REQUIRED)

                         

                        altho i am not sure if setting it to TransactionManagementType.BEAN will do.

                         

                        activation.getActivationSpec().isUseLocalTx() is on the ra or the MDB

                         

                        @ActivationConfigProperty(propertyName = "useLocalTx", propertyValue = "true")

                         

                        which is false by default

                        • 9. Re: Messages redelivered after committed transaction
                          mrpi

                          Hi again!

                           

                          More results!

                           

                          I tried to get activation.isDeliveryTransacted() return true. It appears that this value comes directly from Weblogic by calling isDeliveryTransacted() on the MessageEndpointFactory. Weblogic always returns false if  TransactionManagementType is set BEAN for the MDB.

                          So I guess this means that I can not use transactions at all if I use bean managed transactions? This seems very strange to me. It should be up to me to choose if I want to use bean or container managed transactions, not up to the container implementation, right?

                           

                          If I look at the code in HornetQMessageHandler, rollback-calls are only done in the catch-block of the onMessage()-method. So I really have to throw and exception if something bad happens in my own onMessage()-method? In my opinion utx.rollback() should be sufficient if I use bean managed transactions.

                          • 10. Re: Messages redelivered after committed transaction
                            ataylor

                            Actually, let me clarify, i think ive confused the issue here. If you use bean managed transactions then the consumption of the message will be outside the scope of the transaction. this means as long as you dont throw an exception in your MDB, the message is consumed. However if you throw an exception from the onMessage the message will not be acked and the session rolled back. could you confirm this?

                            • 11. Re: Messages redelivered after committed transaction
                              mrpi

                              I found another thing. When the resource adapter starts up I get the following in the log file (or actually to System.out, since HornetQ uses Java Logging Framework by default for some strange reason):

                               

                              2012-feb-08 10:44:01 org.hornetq.core.logging.impl.JULLogDelegate warn
                              WARNING: It wasn't possible to lookup for a Transaction Manager through the configured properties TransactionManagerLocatorClass and TransactionManagerLocatorMethod
                              2012-feb-08 10:44:08 org.hornetq.core.logging.impl.JULLogDelegate warn
                              WARNING: HornetQ Resource Adapter won't be able to set and verify transaction timeouts in certain cases.
                              2012-feb-08 10:44:13 org.hornetq.core.logging.impl.JULLogDelegate info
                              INFO: HornetQ resource adaptor started
                              

                               

                              After further investigation I found the reason for this. Since we do not configure a transactionManagerLocatorClass in our ra.xml, the HornetQResourceAdapter class uses the followin string as default:

                               

                              org.hornetq.integration.jboss.tm.JBoss5TransactionManagerLocator;org.hornetq.integration.jboss.tm.JBoss4TransactionManagerLocator
                              

                               

                              Since we are running Weblogic, neither of these two classes are found. This means the HornetQResourceAdapter instance will not have any TransactionManager associated to it.

                              In turn, this means the HornetQMessageHandler will also not have a TransactionManager associated to it and never perform a rollback on the transaction if it fails.

                               

                              We really need your advice here. Please answer the following questions:

                              1. How do we get the resource adapter to fetch the transaction manager in Weblogic? (By creating our own TransactionManagerLocator for Weblogic?)
                              2. Why is it so difficult to get this to work? I have debugged HornetQ code and decompiled Weblogic to see what really happens. I don't understand why MDB transactions should be this hard to configure with HornetQ!
                              • 12. Re: Messages redelivered after committed transaction
                                mrpi

                                I have tried all different kinds of combinations, both container managed and bean managed, throw an exception and/or rollback the transaction. I have NEVER gotten a call to ClientSessionImpl's commit() or rollback()-methods.

                                There must be something missing between the EJB UserTransaction and the actual HornetQ session. My guess is in the missing lookup of the TransactionManager in Weblogic. Could this be true?

                                • 13. Re: Messages redelivered after committed transaction
                                  ataylor

                                  If you use BMT then the consumption of the message will not be in the transaction if you use cmt then it will.

                                   

                                  regarding the transactionmanagerlocator class etc, this isneeded for 2 things, 1 to set the tx timeout so its the same as hornetq and 2 to get a handle on the transaction if onmessage throws an exception so we can mark as rollbackonly (cmt only).

                                   

                                  so if you are throwing an exception from an mdb and you want the message redelivered you will have to configure transactionmanagerlocator/method, however the message should still be commited or rolled back at some point by weblogic.

                                   

                                  solution use JBoss

                                  • 14. Re: Messages redelivered after committed transaction
                                    ataylor

                                    if you look at the  public void onMessage(final ClientMessage message) method in HornetQMessageHandler, you will see how we deal with this

                                    1 2 Previous Next