9 Replies Latest reply on May 12, 2014 11:19 AM by Justin Bertram

    Sending and Receiving Messages Concurrently

    Yair Ogen Expert

      I'm using core API on latest version - 2.4.1.

       

      I am testing a use case where users will use producers and consumers concurrently.

       

      I've setup a test where I setup a FixedSizedThreadpool with N+10 threads.

       

      I then in a loop run N times and submit 2 Runnables within each loop iteration. One Runnable will send a message, the other will use the receive (blocking) API.

       

      Obviously this is not effective, but can illustrate a real-life scenario (truth being told we usually use MessageHnadlers).

       

      Oddly enough, I see in Jconsole that all messages are written, but non are being read. I can't explain why. seems all threads are blocked and recieve is stuck although messages exist in the queue.

       

      Any ideas what's going on?

        • 1. Re: Sending and Receiving Messages Concurrently
          Andy Taylor Master

          without seeing any code its impossible to say

          • 2. Re: Re: Sending and Receiving Messages Concurrently
            Yair Ogen Expert

            Here is the code:

             

            
            
            
            
            
            int NUM_ITER = 100;
            
            
            
            
            
                    ExecutorService threadPool = Executors.newFixedThreadPool(NUM_ITER + 10);
                    final CountDownLatch countDownLatch = new CountDownLatch(NUM_ITER);
                    long start = System.currentTimeMillis();
                    final LongAdder longAdder = new LongAdder();
            
            
            
            
            
            
            
            
            
                    for (int i = 0; i < NUM_ITER; i++) {
                        threadPool.execute(new Runnable() {
                            @Override
                            public void run() {
            
            
            
            
            
            
            
            
            ClientMessage clientMessage = session.createMessage(true);
            
            
            
            
            
            
            
            clientMessage.getBodyBuffer().writeString("hi there");                    
            
            
            
                            }
                        });
            
            
                        threadPool.execute(new Runnable() {
                            @Override
                            public void run() {
            
            
            
            
            
            
            
            
            ClientMessage clientMessage = consumer.receive();
            
            
            
            
            
            
            
            clientMessage.acknowledge();                   
            
            
            
                                countDownLatch.countDown();
                                longAdder.add(1);
                                long sum = longAdder.sum();
                                if (sum % 500 == 0) {
                                    System.out.println("read/write index: " + sum);
                                }
                            }
                        });
                    }
                    countDownLatch.await();
            
            
                    long end = System.currentTimeMillis();
                    double total = (end - start) / 1000D;
                    double tps = NUM_ITER / total;
                    System.out.println("tps read and write: " + tps);
                    threadPool.shutdown();
            
            • 3. Re: Re: Sending and Receiving Messages Concurrently
              Andy Taylor Master

              your code formatting seems to have removed some of the code so i cant see it properly, however make sure you are

               

              a) starting the session for the consumer

              b) make sure you use a separate session for each thread as sessions are not thread safe

              • 4. Re: Re: Sending and Receiving Messages Concurrently
                Yair Ogen Expert

                I don't know what's going on in the formatting. Either way I have a wrapper over the core API and it uses thread local to verify each thread has only one session, consumer and producer instance.

                • 6. Re: Re: Sending and Receiving Messages Concurrently
                  Yair Ogen Expert

                  yes. if I see I need a new session as it doesn't exist in ThreadLocal - I create a new Session and start it.

                  • 7. Re: Re: Sending and Receiving Messages Concurrently
                    Justin Bertram Master

                    I'd need a test-case to reproduce the issue to investigate further.

                    • 8. Re: Re: Re: Sending and Receiving Messages Concurrently
                      Yair Ogen Expert

                      See attached. Notice that when the shutdown hook runs some messages do get consumed. I can't understand what I'm doing wrong.

                       

                      /*
                       * Copyright 2014 Cisco Systems, Inc.
                       *
                       *  Licensed under the Apache License, Version 2.0 (the "License");
                       *  you may not use this file except in compliance with the License.
                       *  You may obtain a copy of the License at
                       *
                       *  http://www.apache.org/licenses/LICENSE-2.0
                       *
                       *  Unless required by applicable law or agreed to in writing, software
                       *  distributed under the License is distributed on an "AS IS" BASIS,
                       *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
                       *  See the License for the specific language governing permissions and
                       *  limitations under the License.
                       */
                      
                      
                      import com.twitter.jsr166e.LongAdder;
                      import org.hornetq.api.core.HornetQException;
                      import org.hornetq.api.core.TransportConfiguration;
                      import org.hornetq.api.core.client.*;
                      import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
                      import org.junit.Test;
                      import org.slf4j.Logger;
                      import org.slf4j.LoggerFactory;
                      
                      
                      import java.util.HashSet;
                      import java.util.Set;
                      import java.util.concurrent.CountDownLatch;
                      import java.util.concurrent.ExecutorService;
                      import java.util.concurrent.Executors;
                      
                      
                      /**
                       * Created by Yair Ogen on 04/05/2014.
                       */
                      public class ParallelReadWriteTest {
                      
                      
                          private static final ThreadLocal<ClientProducer> producer = new ThreadLocal<ClientProducer>();
                          public static ThreadLocal<ClientSession> sessionThreadLocal = new ThreadLocal<ClientSession>();
                          private static ClientSessionFactory nettyFactory = null;
                          private static final ThreadLocal<ClientConsumer> consumer = new ThreadLocal<ClientConsumer>();
                          private static Logger LOGGER = LoggerFactory.getLogger("hornetq-test");
                          private static final Set<ClientProducer> producers = new HashSet<ClientProducer>();
                          private static final Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
                      
                      
                      
                      
                          static{
                              init();
                          }
                      
                      
                          private static void init() {
                              try {
                                  nettyFactory = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName())).createSessionFactory();
                      
                      
                              } catch (Exception e) {
                                  LOGGER.error("can't create hornetq session: {}", e, e);
                                  throw new RuntimeException(e);
                              }
                      
                      
                              Runtime.getRuntime().addShutdownHook(new Thread(){
                                  @Override
                                  public void run() {
                                      try {
                                          for (ClientProducer clientProducer : producers) {
                                              clientProducer.close();
                                          }
                                          for (ClientConsumer clientConsumer : consumers) {
                                              clientConsumer.close();
                                          }
                                      } catch (HornetQException e) {
                                          LOGGER.error("can't close producer, error: {}", e, e);
                                      }
                                  }
                              });
                      
                      
                          }
                      
                      
                          private ClientProducer getProducer(String queueName) {
                              try {
                                  if (producer.get() == null) {
                                      ClientProducer clientProducer = getSession().createProducer(queueName);
                                      producers.add(clientProducer);
                                      producer.set(clientProducer);
                                  }
                                  return producer.get();
                              } catch (Exception e) {
                                  LOGGER.error("can't create queue consumer: {}", e, e);
                                  throw new RuntimeException(e);
                              }
                          }
                      
                      
                          private ClientConsumer getConsumer(String queueName) {
                              try {
                                  if (consumer.get() == null) {
                                      ClientConsumer clientConsumer = getSession().createConsumer(queueName);
                                      consumers.add(clientConsumer);
                                      consumer.set(clientConsumer);
                                  }
                                  return consumer.get();
                              } catch (Exception e) {
                                  LOGGER.error("can't create queue consumer: {}", e, e);
                                  throw new RuntimeException(e);
                              }
                          }
                      
                      
                          public static ClientSession getSession() {
                              if (sessionThreadLocal.get() == null) {
                                  try {
                                      ClientSession hornetQSession = nettyFactory.createSession(true, true);
                                      hornetQSession.start();
                                      sessionThreadLocal.set(hornetQSession);
                                  } catch (Exception e) {
                                      LOGGER.error("can't create hornetq session: {}", e, e);
                                      throw new RuntimeException(e);
                                  }
                              }
                              return sessionThreadLocal.get();
                          }
                      
                      
                      
                      
                          @Test
                          public void testParralelSendAndSyncRecieveUsingCore() throws Exception {
                      
                      
                      
                      
                              int NUM_ITER = 100;
                              ExecutorService threadPool = Executors.newFixedThreadPool(NUM_ITER + 10);
                              final CountDownLatch countDownLatch = new CountDownLatch(NUM_ITER);
                      
                      
                      
                      
                              long start = System.currentTimeMillis();
                      
                      
                              final LongAdder longAdder = new LongAdder();
                      
                      
                              for (int i = 0; i < NUM_ITER; i++) {
                      
                      
                                  threadPool.execute(new Runnable() {
                                      @Override
                                      public void run() {
                                          ClientMessage clientMessage = getSession().createMessage(true);
                                          clientMessage.getBodyBuffer().writeString("hi there");
                                          try {
                                              getProducer("myExampleDirect").send(clientMessage);
                                          } catch (HornetQException e) {
                                              e.printStackTrace();
                                          }
                                      }
                                  });
                      
                      
                                  threadPool.execute(new Runnable() {
                                      @Override
                                      public void run() {
                                          try {
                                              ClientMessage clientMessage = getConsumer("myExampleDirect").receive();
                                              clientMessage.acknowledge();
                                          } catch (HornetQException e) {
                                              e.printStackTrace();
                                          }
                                          countDownLatch.countDown();
                                          longAdder.add(1);
                                          long sum = longAdder.sum();
                                          if (sum % 500 == 0) {
                                              System.out.println("read/write index: " + sum);
                                          }
                      
                      
                                      }
                                  });
                      
                      
                              }
                      
                      
                              countDownLatch.await();
                      
                      
                              long end = System.currentTimeMillis();
                      
                      
                              double total = (end - start) / 1000D;
                      
                      
                              double tps = NUM_ITER / total;
                      
                      
                              System.out.println("tps read and write: " + tps);
                      
                      
                              threadPool.shutdown();
                          }
                      }
                      
                      • 9. Re: Re: Re: Sending and Receiving Messages Concurrently
                        Justin Bertram Master

                        I was looking for something I could actually run with minimal set-up.  Could you modify one of the existing examples we ship to reproduce the problem you're seeing and attach the whole thing to the thread?  That way I can just grab it and run it.