9 Replies Latest reply on Feb 12, 2015 2:13 PM by clebert.suconic

    JMS consumer performance regression in HornetQ 2.4 during consumer acknowledge() call

    benspiller

      We're looking at upgrading the JMS provider we recommend from HornetQ 2.3.0 to 2.4.0, but are seeing significant (~50%) regressions in number of messages received per second from a queue. For now we're focusing on NON_PERSISTENT messages, and we use CLIENT_ACK mode (as recommended by the HornetQ docs) and batch up many calls to consumer.receive() before calling to acknowledge to reduce the cost of the acknowledgements.

       

      We did some profiling and it looks as though the change in 2.4 is that vastly more time is taken during the acknowledge() call, so I was wondering if some performance (or functionality) changes were made recently in this area that could be related?

       

      I also had a go at varying the number of receive() calls we make before calling acknowledge() and found that:

      i) it makes a huge difference to throughput (much more than I expected actually)

      ii) batch sizes >=100 receives/ack actually make performance worse rather than better - surprising, as I'd have thought going back to the server less often for the ack would help performance. I wonder what data structure there is there that's causing it to drop off so fast

      iii) there is a big difference in the impact of this batch size from 2.3 to 2.4 - instead of performance improving then becoming relatively constant as batch size is increases (in 2.3), in 2.4 we see higher throughput for very low batch size but then after 20 msg/batch it very quickly drops off to become much worse than 2.3 performance, and in fact continue to deteriorate as batch size is increased:

      throughput_vs_batch_size.png

      We also had a go at measuring the average time that each acknowledge() call takes, and again found not only that it's worse for high batch sizes in 2.4, but also that the shape of the graph is quite different with performance falling off very rapidly for higher batch sizes:

      time_per_ack.png

      I'd be very interested to hear thoughts from the dev team on these observations. If it's possible it'd be great to get a fix that returns similar behaviour to the last release (but I wanted to get your initial thoughts before filing a bug)

       

      I also wonder whether it would be a good idea to have a section in the performance tuning part of the HornetQ docs about the effect of batch size on performance, and what you recommend. With most other JMS providers I've found performing about 500 receives per acknowledgement gives the best performance and HornetQ (at least for now) seem to behave differently, so it'd be useful to document what you expect/recommend.

       

      Thanks!

        • 1. Re: JMS consumer performance regression in HornetQ 2.4 during consumer acknowledge() call
          clebert.suconic

          I just looked around and I don't see any changes that I could think of now.

           

           

          2.3.x has had several fixes since it's first release... One thing i remember is if you use a different priority you would have acks performing individual-acknowledgements and that could be a bit slower. There are some boundaries where you would also have a TX started on the journal to avoid certain issues, but I don't see that happening here.

           

           

          I would need some running test where we could make some better comparisons. Any way you could share your tests?

          • 2. Re: JMS consumer performance regression in HornetQ 2.4 during consumer acknowledge() call
            clebert.suconic

            a simple test I mean

            • 3. Re: Re: JMS consumer performance regression in HornetQ 2.4 during consumer acknowledge() call
              benspiller

              It's not easy to disentangle the main throughput test from the rest of our codebase, but I can more easily share the test that shows the change in ack time that is causing it:

               

              package test;
              
              
              import java.util.Properties;
              import java.util.concurrent.atomic.AtomicBoolean;
              
              
              import javax.jms.Connection;
              import javax.jms.ConnectionFactory;
              import javax.jms.DeliveryMode;
              import javax.jms.Message;
              import javax.jms.MessageConsumer;
              import javax.jms.MessageProducer;
              import javax.jms.Queue;
              import javax.jms.Session;
              import javax.naming.InitialContext;
              
              
              public class TestAckTimeVariation {
                private static final java.util.logging.Logger LOGGER = java.util.logging.Logger.getLogger(TestAckTimeVariation.class.getName());
                private static final long MAX_BATCH_SIZE = Long.getLong("MAX_BATCH_SIZE", 2000);
                public static ConnectionFactory connectionFactory;
                public static AtomicBoolean receiverDone = new AtomicBoolean(false);
              
              
                public static void main(String[] args) throws Exception {
                int port = Integer.getInteger("HORNETQ_TEST_PORT", 1099);
                String queueName = System.getProperty("HORNETQ_TEST_QUEUE", "test-queue-01");
                long numberOfSamples = Long.getLong("HORNETQ_TEST_SAMPLES", 1000);
              
              
                Properties jndiEnvironment = new Properties();
                jndiEnvironment.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
                jndiEnvironment.setProperty("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
                jndiEnvironment.setProperty("java.naming.provider.url", "jnp://localhost:" + port);
              
                InitialContext jndiContext = new InitialContext(jndiEnvironment); 
                connectionFactory = (ConnectionFactory) jndiContext.lookup("ConnectionFactory");
              
                MessageReceiver receiver = new MessageReceiver(queueName, numberOfSamples);
                receiver.start();
                MessageSender sender = new MessageSender(queueName);
                sender.start();
              
                receiver.join();
                receiverDone.set(true);
                sender.join();
              
                }
              
                private static class MessageReceiver extends Thread {
                private final String qName;
                private final long numberOfSamples;
                public MessageReceiver(String qname, long numberOfSamples) throws Exception {
                this.qName = qname;
                this.numberOfSamples = numberOfSamples;
                }
              
                @Override
                public void run() {
                try {
                LOGGER.info("Receiver: Connecting");
                Connection c = connectionFactory.createConnection();
              
                int mode = 0;
                mode = Session.CLIENT_ACKNOWLEDGE;
                LOGGER.info("Receiver: Using CLIENT_ACK mode");
              
                Session s = c.createSession(false, mode);
                Queue q = s.createQueue(this.qName);
                MessageConsumer consumer = s.createConsumer(q, null, false);
              
                c.start();
              
                Message m = null;
              
                // loop until we exhaust all batch sizes we wanted to test
                long currentBatchSize = 1;
                while(currentBatchSize <= MAX_BATCH_SIZE) {
                // number of messages unacked messages since last acknowledgement
                long unackedMsgs = 0;
                // total ack time 
                long ackTime = 0;
                // number of time acknowledge() called
                int samples = 0;
              
                // take the samples specified number of times
                while (samples < numberOfSamples) {
                m = consumer.receive(5000);
                if (m != null) {
                unackedMsgs++;
              
                // now ack all unacked messages
                if (unackedMsgs == currentBatchSize) {
                long st = System.nanoTime();
                m.acknowledge();
                ackTime += System.nanoTime() - st;
                // rested unacked messages count
                unackedMsgs = 0;
                // increase the count of number of samples taken
                samples++;
                }
                }
                }
                System.out.println(String.format("%4d - %10d", currentBatchSize, ackTime/samples));
              
                // now try with different batch size
                currentBatchSize = nextBatchSize(currentBatchSize);
                }
              
                c.close();
                } catch (Exception e) {
                e.printStackTrace();
                }
                }
              
                private long nextBatchSize(long prevBatchSize) {
                long newSize;
              
                if (prevBatchSize < 10) {
                newSize = prevBatchSize + 1;
                } else if (prevBatchSize < 100) {
                newSize = prevBatchSize + 10;
                } else {
                newSize = prevBatchSize + 100;
                }
                return newSize;
                }
              
                }
              
                private static class MessageSender extends Thread {
                final String qName;
              
                public MessageSender(String qname) throws Exception {
                this.qName = qname;
                }
              
                @Override
                public void run() {
                try {
                LOGGER.info("Sender: Connecting");
                Connection c = connectionFactory.createConnection();
                Session s = null;
                s = c.createSession(true, Session.SESSION_TRANSACTED);
                LOGGER.info("Sender: Using TRANSACTED session");
              
              
                Queue q = s.createQueue(this.qName);
                MessageProducer producer = s.createProducer(null);
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
              
                c.start();
                long sent = 0L;
                while (!receiverDone.get()) {
                sent++;
                producer.send(q, s.createTextMessage("Message_" + sent));
                if (sent % MAX_BATCH_SIZE == 0 ) {
                s.commit();
                }
                }
              
                } catch (Exception e) {
                if (e instanceof InterruptedException) {
                LOGGER.info("Sender done.");
                } else {
                e.printStackTrace();
                }
                }
                }
                }
              }
              
              • 4. Re: JMS consumer performance regression in HornetQ 2.4 during consumer acknowledge() call
                clebert.suconic

                the only thing I can think of now is netty version changes, while we are only using NIO at the newest version since Netty is deprecating OIO.

                 

                 

                I will need some time.. give me till tomorrow 15th of Jan/2015 to look into this.. I have already copied & pasted your test into a new test (attach would been easier for me, but it's ok now )

                 

                 

                give me at least one day...

                • 5. Re: JMS consumer performance regression in HornetQ 2.4 during consumer acknowledge() call
                  clebert.suconic

                  It seems related to Netty 4 update on how we allocate buffers. It's becoming a major task for me now.. I will need another week but I'm focusing on this now. This affects 2.4+ for sure.. 2.3 is fine.

                  • 6. Re: JMS consumer performance regression in HornetQ 2.4 during consumer acknowledge() call
                    clebert.suconic

                    The issue is with Commits. The ClientACK case is sending a commit, once we moved to NIO the thread model was changed.

                     

                    We used to always treat the commit through a different executor what wasn't an issue with OIO. now that we are using OIO that's an issue if you don't have anything to commit (that is IO)

                     

                     

                    You are seeing this since you are using non persistent.

                     

                     

                    I already have a fix but I'm improving it properly. I should commit a fix soon on both 2.4.x and activemq-6

                    • 7. Re: JMS consumer performance regression in HornetQ 2.4 during consumer acknowledge() call
                      benspiller

                      Awesome, thanks Clebert!

                      • 8. Re: JMS consumer performance regression in HornetQ 2.4 during consumer acknowledge() call
                        clebert.suconic

                        The avg on the nanoseconds is being misleading on my tests.. the throughput is much higher after the fix but the avg time is bigger.. perhaps because I'm doing a lot more calls now?

                         

                         

                        Would you be able to try my dev branch? I'm about to send a PR (just finishing some tests):

                         

                         

                        git clone https://github.com/clebertsuconic/hornetq.git

                        cd hornetq

                        git checkout 240

                         

                        mvn -Prelease package

                         

                         

                         

                        If you are interested in running the tests I wrote:

                         

                        if you create a script perf-tests.sh:

                        mvn -DskipPerformanceTests=false -Dtest=$1 test

                         

                         

                        you could run perf-tests.sh under tests/performance-tests

                        perf-test MeasureCommitPerfTest

                         

                        #This is your test

                        per-test ACKTimeTest

                         

                         

                         

                        if you would like, I also have the branch before upgrading to netty (what caused the regression):

                         

                        # on the same clone

                        git checkout before-netty

                         

                         

                         

                        This message here is transient as my git will probably change overtime. but the branch should live for some time at least till I do the PR.

                        • 9. Re: JMS consumer performance regression in HornetQ 2.4 during consumer acknowledge() call
                          clebert.suconic

                          I have committed the fix on both 2.4.x, master and activemq6 branches.

                           

                           

                          Can you try it out?