1 2 3 Previous Next 33 Replies Latest reply on Jul 5, 2013 5:05 AM by ataylor Go to original post
      • 16. Re: Could double-acking be due to HornetQ resending a message?
        mlange

        Thanks Andy, I will check to set BlockOnAcknowledge to true in the connection factory.

         

        Btw. when using XA semantics during receiving could this problem comletely be avoided?

        • 17. Re: Could double-acking be due to HornetQ resending a message?
          ataylor

          Btw. when using XA semantics during receiving could this problem comletely be avoided?

          yes, thats exactly what XA is for, you could also use a JMS transaction with slighlty less gaurantees (i.e. if the commit call fails you cant be sure of the outcome)

          • 18. Re: Could double-acking be due to HornetQ resending a message?
            mlange

            I finally got it running using JMS tx only. Two open issues:

             

            *when paging is done the backup node throws thousands of these errors (and many messages seem to be lost afterwards):

             

            WARN  [org.hornetq.core.paging.cursor.impl.PageSubscriptionImpl] (Thread-2 (HornetQ-server-HornetQServerImpl::serverUUID=5dc56443-e24e-11e2-a9d2-a92ebbc3f257-1143284636)) Couldn't locate page transaction 399299, ignoring message on position PagePositionImpl [pageNr=18, messageNr=158, recordID=-1]

             

            When paging is not done (max-size-bytes set to a high value) this does not happen.

             

            *after the failover to the backup node some of the queues do not have consumers and so messages are not delivered, how can the consumers forced to failover as well (assumed that the client does not close them)

             

            Thanks!

            Marek

            • 19. Re: Could double-acking be due to HornetQ resending a message?
              mlange

              Again stuck with a (trivial?) problem:

               

              When catching the JMSTransactionRolledBackException on session.commit() failover works fine with no duplicates ending up on the consumers. However it is now impossible to execute a session.rollback() to trigger a redelivery once the commit has been done before:

               

              try {

                session.commit();

                messageListener.onMessage(message);

              } catch (RuntimeException re) {

                session.rollback();

              }

               

              To avoid to deliver dups to non-idempotent services the commit must be done before the business logic of the consumer.

               

              Any idea how to handle this?

               

              Thanks,

              Marek

               

              P.S. I tried to encapsulate this with a XA transaction, however the messages seem to be stuck in the queue (deliveringCount=messageCount) after the delivery.

              • 20. Re: Could double-acking be due to HornetQ resending a message?
                ataylor

                Marek, sorry for the slow reply.

                 

                You need to use XA and encompass the other work in the same tx.

                • 21. Re: Could double-acking be due to HornetQ resending a message?
                  mlange

                  No problem Andy, thanks for your help here!

                   

                  I have already tried to encapsulate the call into an XA transaction (based on the examples provided in XAReceiveExample - is that right?). I could not get it to run.

                   

                  As soon as I am using XAConnection, XASession interfaces the messages sent into the queue are consumed but they stay in the queue. MessageCount is always equal to DeliveringCount. Not sure what is going on in this case. It looks like they are not acknowledged somehow. I am using a resource adaptor and thus HornetQRAManagedConnection and HornetQRASession - which always seem to use AUTO_ACKNOWLEDGE:

                   

                  HornetQRAManagedConnection:

                   

                  private void setup() throws ResourceException {

                     try

                     {

                        boolean transacted = cri.isTransacted();

                        int acknowledgeMode =  Session.AUTO_ACKNOWLEDGE;

                   

                  The problem is similiar to this one:

                  https://community.jboss.org/message/811289

                  https://community.jboss.org/thread/223356

                   

                  I haven't found a solution for this up to now. Maybe I should also add that we are not closing the consumers as long as the server hosting the jms client is up (as suggested in the posts sometimes). Once the message client is initialized, it opens several threads for consuming and these connections/sessions are kept until they are explicitely closed (in most cases when the server is shutdown). Is that maybe related to the problem?

                   

                  Thanks,
                  Marek

                  • 22. Re: Could double-acking be due to HornetQ resending a message?
                    ataylor

                    Marek,

                     

                    I don't really understand what you are doing, could you provide, code, configuration etc?

                    • 23. Re: Could double-acking be due to HornetQ resending a message?
                      ataylor

                      just a couple of notes:

                       

                      The XAReceive example is correct, but its just showing the use of XA in HornetQ, in reality these calls would be coordinated and called via a transaction manager. A typical example is an incoming message to an MDB that then writes to a database.

                       

                      Also the ack mode is ignored when you are using xa

                      • 24. Re: Could double-acking be due to HornetQ resending a message?
                        mlange

                        Sorry for being unclear!

                         

                        The consumer retrieves the connection via JNDI from a connectionfactory which is based on the RA having XA configured:

                         

                        <connection-factories>

                          <tx-connection-factory>

                            <jndi-name>HornetqConsumerPooledConnectionFactory</jndi-name>

                            <xa-transaction/>

                            <rar-name>hornetq-ra.rar</rar-name>

                         

                        In the code there is an api method provided which is used to register consumers created by the api user:

                         

                        public void registerListener(....) {

                         

                          // lookup connectionfactory from JNDI

                          XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext.lookup("HornetqConsumerPooledConnectionFactory");

                         

                          // start threads for the consumers

                          for (int count = 0; count < 20; count++) {

                            XASession consumerSession = consumerConnection.createXASession();

                            Session session = consumerSession.getSession();

                            MessageConsumer messageConsumer = session.createConsumer(queue);

                            consumerConnection.start();

                         

                            MessageConsumerRunnable consumer = new MessageConsumerRunnable(messageConsumer, consumerSession);

                            Thread queueReceiverThread = new Thread(consumer);

                            queueReceiverThread.setDaemon(true);

                            queueReceiverThread.setName("Receiver" + count + "-" + queue + " (" + consumerSession + ")");

                            queueReceiverThread.start();

                          }

                        }

                         

                        // receiver logic

                        private final class MessageConsumerRunnable implements Runnable {

                          private final MessageConsumer queueReceiver;

                          private final XASession session;

                         

                          private MessageConsumerRunnable(MessageConsumer queueReceiver, XASession session) {

                            this.queueReceiver = queueReceiver;

                            this.session = session;

                          }

                         

                          @Override

                          public void run() {

                             while (true) {

                                TextMessage message = null;

                                XAResource xaRes = null;

                         

                                try {

                                    message = (TextMessage) queueReceiver.receive(2000L);

                                    if (message != null) {

                                       System.out.printf("Received message: %s(redelivered?: %s, Delivery Count: %s)\n", message.getText(), message.getJMSRedelivered(), message.getIntProperty("JMSXDeliveryCount"));

                                       // execute business logic here....

                                    }

                                } catch (TransactionRolledBackException e) {

                                      System.out.println("TransactionRolledBackException due to failover for " + message.getText());

                                }

                            }

                        }

                         

                        The consumer is closed once the client calls another method unregisterListener(...). It is not closed inside the processing logic (like it is in most of the examples).

                         

                        This lets the messages in the queue after consuming them. But why? IIRC there is no need to message.acknowledge() or session.commit() (HornetQ throws an exception IMHO). Or is it supposed not to work when no explicit XA transactions are used but an XA-capable connectionfactory?

                         

                        As the api provider I am not aware which logic is executed in the consumer. It would not be enough to encapsulate this into the XA tx started from the HornetQ XASession.getXAResource(), right?

                         

                        Thanks,

                        Marek

                        • 25. Re: Could double-acking be due to HornetQ resending a message?
                          ataylor

                          How are you executing your consumer code, because the tx manager will only only starts and ends transactions within MDB and other JEE component boundaries (onMessage on an MDB).

                          • 26. Re: Could double-acking be due to HornetQ resending a message?
                            mlange

                            This is not an MDB, it is a simple JMS MessageListener (which is running inside the appserver btw.). The code is started when user calls the registerListener() method, usually during the startup of the server. I know that the tx is not started automatically when the code is not executed in the onMessage() of an MDB. But is it a problem then to use XA Connections?

                            • 27. Re: Could double-acking be due to HornetQ resending a message?
                              ataylor

                              the point on using the pooled xa connection factory is to let the transaction manager in the app server coordinate things, this is done when the app server starts a thread for a servlet or say MDB, the tx manager will start, end and either rollback or commit the tx during the life cycle of the Thread. What you are doing is by passing everything, its actually an ant pattern to do what you are doing, all thread handling should be done by the app server for this exact reason, so thing slike transaction management, security etc can be controlled. You should probably use an MDB, is there any reason you aren't?

                              • 28. Re: Could double-acking be due to HornetQ resending a message?
                                mlange

                                We have considered to support MDBs but decided not to do so. First the JMS client is provided for a huge software platform, it should free the developer from using the JMS api (and above support specific requirements for the internal infrastructure). We have used HornetQ as embedded servers in the appservers before but wanted to get rid of the local queues in /data. So we went for a central HA installation with the journals in the storage. The clients connect remotely to these instances (using netty connector). When we use MDBs it would require to also use the RA in the MDB (right?) and also connect remotely. That might be ok still. And, above all, it requires the client library to start/stop MDBs for consuming. Thus the client always has to run in the appserver (no unmanaged/standalone usage which we are also supporting now). It is complicated to embed the client then into the software (for the lib without MDBs it requires just a Maven dependency). Hope that explains why we cannot use MDBs (although I see the advantages!).

                                 

                                The thing is everything runs as expected with this non-MDB setup except this failover case - which produces the message duplicates. It is unacceptable to provide a client library for our developers which is not 100% reliable with regard to message duplication - imagine use cases where business logic is executed twice e.g. in a payment use case. It is hard to believe that HornetQ is only used in production environments which do not have this requirement. Sounds not "reliable" for us.

                                • 29. Re: Could double-acking be due to HornetQ resending a message?
                                  ataylor

                                  When we use MDBs it would require to also use the RA in the MDB (right?) and also connect remotely. That might be ok still

                                  Yes, you can  configure  MDB's to connect remotely and also to support failover and what you get for free is XA and transaction coordination.

                                  The thing is everything runs as expected with this non-MDB setup except this failover case - which produces the message duplicates. It is unacceptable to provide a client library for our developers which is not 100% reliable with regard to message duplication - imagine use cases where business logic is executed twice e.g. in a payment use case. It is hard to believe that HornetQ is only used in production environments which do not have this requirement. Sounds not "reliable" for us.

                                  I can only re iterate what I said before, 2 phase commit (XA) is the only way to know for sure after failover if messages have been rolled back or need commiting. The transaction manager keeps a record of each tx and knows its state at any moment in time, so after failover it will deal with the tx as needed.

                                   

                                  Saying that one possible solution is to implement your own duplicate detection on the consumer side, just persist a cache of message ID's received and dealt with and ignore them on redelivery. This is similar to how we deal with duplicates on producer sends, it avoids the over head of XA.

                                   

                                  One last point on your current approach, Its not my place to tell you how to develop your applications and I understand that sometimes there is no choice, however spawning your own threads from within an app server is a bad thing, you have already seen issues such as no transaction management, but you may have security issues and have problems with scalibilty if the load on your application grows, remember, this is why the app server (and JEE) is there, to do these things for you and to provide the tooling, configuration and pooling etc to allow you to configure the server to best handle your application.