9 Replies Latest reply on May 12, 2014 11:19 AM by jbertram

    Sending and Receiving Messages Concurrently

    yairogen

      I'm using core API on latest version - 2.4.1.

       

      I am testing a use case where users will use producers and consumers concurrently.

       

      I've setup a test where I setup a FixedSizedThreadpool with N+10 threads.

       

      I then in a loop run N times and submit 2 Runnables within each loop iteration. One Runnable will send a message, the other will use the receive (blocking) API.

       

      Obviously this is not effective, but can illustrate a real-life scenario (truth being told we usually use MessageHnadlers).

       

      Oddly enough, I see in Jconsole that all messages are written, but non are being read. I can't explain why. seems all threads are blocked and recieve is stuck although messages exist in the queue.

       

      Any ideas what's going on?

        • 1. Re: Sending and Receiving Messages Concurrently
          ataylor

          without seeing any code its impossible to say

          • 2. Re: Re: Sending and Receiving Messages Concurrently
            yairogen

            Here is the code:

             

            
            
            
            
            
            int NUM_ITER = 100;
            
            
            
            
            
                    ExecutorService threadPool = Executors.newFixedThreadPool(NUM_ITER + 10);
                    final CountDownLatch countDownLatch = new CountDownLatch(NUM_ITER);
                    long start = System.currentTimeMillis();
                    final LongAdder longAdder = new LongAdder();
            
            
            
            
            
            
            
            
            
                    for (int i = 0; i < NUM_ITER; i++) {
                        threadPool.execute(new Runnable() {
                            @Override
                            public void run() {
            
            
            
            
            
            
            
            
            ClientMessage clientMessage = session.createMessage(true);
            
            
            
            
            
            
            
            clientMessage.getBodyBuffer().writeString("hi there");                    
            
            
            
                            }
                        });
            
            
                        threadPool.execute(new Runnable() {
                            @Override
                            public void run() {
            
            
            
            
            
            
            
            
            ClientMessage clientMessage = consumer.receive();
            
            
            
            
            
            
            
            clientMessage.acknowledge();                   
            
            
            
                                countDownLatch.countDown();
                                longAdder.add(1);
                                long sum = longAdder.sum();
                                if (sum % 500 == 0) {
                                    System.out.println("read/write index: " + sum);
                                }
                            }
                        });
                    }
                    countDownLatch.await();
            
            
                    long end = System.currentTimeMillis();
                    double total = (end - start) / 1000D;
                    double tps = NUM_ITER / total;
                    System.out.println("tps read and write: " + tps);
                    threadPool.shutdown();
            
            • 3. Re: Re: Sending and Receiving Messages Concurrently
              ataylor

              your code formatting seems to have removed some of the code so i cant see it properly, however make sure you are

               

              a) starting the session for the consumer

              b) make sure you use a separate session for each thread as sessions are not thread safe

              • 4. Re: Re: Sending and Receiving Messages Concurrently
                yairogen

                I don't know what's going on in the formatting. Either way I have a wrapper over the core API and it uses thread local to verify each thread has only one session, consumer and producer instance.

                • 5. Re: Re: Sending and Receiving Messages Concurrently
                  ataylor

                  so you are starting the session?

                  • 6. Re: Re: Sending and Receiving Messages Concurrently
                    yairogen

                    yes. if I see I need a new session as it doesn't exist in ThreadLocal - I create a new Session and start it.

                    • 7. Re: Re: Sending and Receiving Messages Concurrently
                      jbertram

                      I'd need a test-case to reproduce the issue to investigate further.

                      • 8. Re: Re: Re: Sending and Receiving Messages Concurrently
                        yairogen

                        See attached. Notice that when the shutdown hook runs some messages do get consumed. I can't understand what I'm doing wrong.

                         

                        /*
                         * Copyright 2014 Cisco Systems, Inc.
                         *
                         *  Licensed under the Apache License, Version 2.0 (the "License");
                         *  you may not use this file except in compliance with the License.
                         *  You may obtain a copy of the License at
                         *
                         *  http://www.apache.org/licenses/LICENSE-2.0
                         *
                         *  Unless required by applicable law or agreed to in writing, software
                         *  distributed under the License is distributed on an "AS IS" BASIS,
                         *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
                         *  See the License for the specific language governing permissions and
                         *  limitations under the License.
                         */
                        
                        
                        import com.twitter.jsr166e.LongAdder;
                        import org.hornetq.api.core.HornetQException;
                        import org.hornetq.api.core.TransportConfiguration;
                        import org.hornetq.api.core.client.*;
                        import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
                        import org.junit.Test;
                        import org.slf4j.Logger;
                        import org.slf4j.LoggerFactory;
                        
                        
                        import java.util.HashSet;
                        import java.util.Set;
                        import java.util.concurrent.CountDownLatch;
                        import java.util.concurrent.ExecutorService;
                        import java.util.concurrent.Executors;
                        
                        
                        /**
                         * Created by Yair Ogen on 04/05/2014.
                         */
                        public class ParallelReadWriteTest {
                        
                        
                            private static final ThreadLocal<ClientProducer> producer = new ThreadLocal<ClientProducer>();
                            public static ThreadLocal<ClientSession> sessionThreadLocal = new ThreadLocal<ClientSession>();
                            private static ClientSessionFactory nettyFactory = null;
                            private static final ThreadLocal<ClientConsumer> consumer = new ThreadLocal<ClientConsumer>();
                            private static Logger LOGGER = LoggerFactory.getLogger("hornetq-test");
                            private static final Set<ClientProducer> producers = new HashSet<ClientProducer>();
                            private static final Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
                        
                        
                        
                        
                            static{
                                init();
                            }
                        
                        
                            private static void init() {
                                try {
                                    nettyFactory = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName())).createSessionFactory();
                        
                        
                                } catch (Exception e) {
                                    LOGGER.error("can't create hornetq session: {}", e, e);
                                    throw new RuntimeException(e);
                                }
                        
                        
                                Runtime.getRuntime().addShutdownHook(new Thread(){
                                    @Override
                                    public void run() {
                                        try {
                                            for (ClientProducer clientProducer : producers) {
                                                clientProducer.close();
                                            }
                                            for (ClientConsumer clientConsumer : consumers) {
                                                clientConsumer.close();
                                            }
                                        } catch (HornetQException e) {
                                            LOGGER.error("can't close producer, error: {}", e, e);
                                        }
                                    }
                                });
                        
                        
                            }
                        
                        
                            private ClientProducer getProducer(String queueName) {
                                try {
                                    if (producer.get() == null) {
                                        ClientProducer clientProducer = getSession().createProducer(queueName);
                                        producers.add(clientProducer);
                                        producer.set(clientProducer);
                                    }
                                    return producer.get();
                                } catch (Exception e) {
                                    LOGGER.error("can't create queue consumer: {}", e, e);
                                    throw new RuntimeException(e);
                                }
                            }
                        
                        
                            private ClientConsumer getConsumer(String queueName) {
                                try {
                                    if (consumer.get() == null) {
                                        ClientConsumer clientConsumer = getSession().createConsumer(queueName);
                                        consumers.add(clientConsumer);
                                        consumer.set(clientConsumer);
                                    }
                                    return consumer.get();
                                } catch (Exception e) {
                                    LOGGER.error("can't create queue consumer: {}", e, e);
                                    throw new RuntimeException(e);
                                }
                            }
                        
                        
                            public static ClientSession getSession() {
                                if (sessionThreadLocal.get() == null) {
                                    try {
                                        ClientSession hornetQSession = nettyFactory.createSession(true, true);
                                        hornetQSession.start();
                                        sessionThreadLocal.set(hornetQSession);
                                    } catch (Exception e) {
                                        LOGGER.error("can't create hornetq session: {}", e, e);
                                        throw new RuntimeException(e);
                                    }
                                }
                                return sessionThreadLocal.get();
                            }
                        
                        
                        
                        
                            @Test
                            public void testParralelSendAndSyncRecieveUsingCore() throws Exception {
                        
                        
                        
                        
                                int NUM_ITER = 100;
                                ExecutorService threadPool = Executors.newFixedThreadPool(NUM_ITER + 10);
                                final CountDownLatch countDownLatch = new CountDownLatch(NUM_ITER);
                        
                        
                        
                        
                                long start = System.currentTimeMillis();
                        
                        
                                final LongAdder longAdder = new LongAdder();
                        
                        
                                for (int i = 0; i < NUM_ITER; i++) {
                        
                        
                                    threadPool.execute(new Runnable() {
                                        @Override
                                        public void run() {
                                            ClientMessage clientMessage = getSession().createMessage(true);
                                            clientMessage.getBodyBuffer().writeString("hi there");
                                            try {
                                                getProducer("myExampleDirect").send(clientMessage);
                                            } catch (HornetQException e) {
                                                e.printStackTrace();
                                            }
                                        }
                                    });
                        
                        
                                    threadPool.execute(new Runnable() {
                                        @Override
                                        public void run() {
                                            try {
                                                ClientMessage clientMessage = getConsumer("myExampleDirect").receive();
                                                clientMessage.acknowledge();
                                            } catch (HornetQException e) {
                                                e.printStackTrace();
                                            }
                                            countDownLatch.countDown();
                                            longAdder.add(1);
                                            long sum = longAdder.sum();
                                            if (sum % 500 == 0) {
                                                System.out.println("read/write index: " + sum);
                                            }
                        
                        
                                        }
                                    });
                        
                        
                                }
                        
                        
                                countDownLatch.await();
                        
                        
                                long end = System.currentTimeMillis();
                        
                        
                                double total = (end - start) / 1000D;
                        
                        
                                double tps = NUM_ITER / total;
                        
                        
                                System.out.println("tps read and write: " + tps);
                        
                        
                                threadPool.shutdown();
                            }
                        }
                        
                        • 9. Re: Re: Re: Sending and Receiving Messages Concurrently
                          jbertram

                          I was looking for something I could actually run with minimal set-up.  Could you modify one of the existing examples we ship to reproduce the problem you're seeing and attach the whole thing to the thread?  That way I can just grab it and run it.