7 Replies Latest reply on May 5, 2014 9:16 AM by Yair Ogen

    Understanding Clusters from client point of view

    Yair Ogen Expert

      I've read the section about clusters and would like to try it out.

       

      I am using the core API, and there is onw thing I don't understand.

       

      Assume I have a cluster with static connectors defined in the configuration file:

       

      <cluster-connection name="my-cluster">
         ...
         <static-connectors>
            <connector-ref>server0-connector</connector-ref>
            <connector-ref>server1-connector</connector-ref>
         </static-connectors>
      </cluster-connection>
      
      
      

       

      And in the code I do this:

       

      HashMap<String, Object> map = new HashMap<String, Object>();
      map.put("host", "myhost");
      map.put("port", "5445");
      TransportConfiguration server1 = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
      HashMap<String, Object> map2 = new HashMap<String, Object>();
      map2.put("host", "myhost2");
      map2.put("port", "5446");
      TransportConfiguration server2 = new TransportConfiguration(NettyConnectorFactory.class.getName(), map2);
      
      
      ServerLocator locator = HornetQClient.createServerLocatorWithHA(server1, server2);
      ClientSessionFactory factory = locator.createSessionFactory();
      ClientSession session = factory.createSession();
      
      
      

       

      Now - if I want client side load balancing as well as server side load balancing (which I get for free once the xml section above is defined), do I need to re-create session,producers and consumers - OR - can I use one session instance and one producer and one consumer instance and the core library will know how to round-robin between the servers?

        • 1. Re: Understanding Clusters from client point of view
          Yong Hao Gao Master

          Once the session is created it stays with one connection, which means the session is only connected to one of the two node cluster. The round-robin happens when you create multiple sessions. It's easy to verify with your test.

          • 2. Re: Understanding Clusters from client point of view
            Yair Ogen Expert

            Thanks. That's what I thought. However, I create 2 sessions using the same connection factory in 2 different threads and they both result in 2 consumers on the same node instead of on consumer on each node as I expected.

             

            What am I missing?

            • 4. Re: Re: Understanding Clusters from client point of view
              Yair Ogen Expert

              Here it is:

               

               private static final ThreadLocal<ClientProducer> producer = new ThreadLocal<ClientProducer>();
                  private static final ThreadLocal<ClientConsumer> consumer = new ThreadLocal<ClientConsumer>();
                  private static final Set<ClientProducer> producers = new HashSet<ClientProducer>();
                  private static final Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
              
              
                  public static ThreadLocal<ClientSession> sessionThreadLocal = new ThreadLocal<ClientSession>();
              
              
                  private static ClientSessionFactory nettyFactory = null;
                  private static Logger LOGGER = LoggerFactory.getLogger("hornetq-test");
              
              
                  static {
                      init();
                  }
              
              
                  private static void init() {
                      try {
              //            nettyFactory = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName())).createSessionFactory();
                          Map<String, Object> map = new HashMap<String, Object>();
                          map.put("host", "localhost");
                          map.put("port", "5445");
                          TransportConfiguration server1 = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
              
              
                          HashMap<String, Object> map2 = new HashMap<String, Object>();
                          map2.put("host", "localhost");
                          map2.put("port", "6445");
                          TransportConfiguration server2 = new TransportConfiguration(NettyConnectorFactory.class.getName(), map2);
              
              
                          nettyFactory = HornetQClient.createServerLocatorWithHA(server1, server2).createSessionFactory();
                      } catch (Exception e) {
                          LOGGER.error("can't create hornetq session: {}", e, e);
                          throw new RuntimeException(e);
                      }
              
              
                      Runtime.getRuntime().addShutdownHook(new Thread() {
                          @Override
                          public void run() {
                              try {
                                  for (ClientProducer clientProducer : producers) {
                                      clientProducer.close();
                                  }
                                  for (ClientConsumer clientConsumer : consumers) {
                                      clientConsumer.close();
                                  }
                              } catch (HornetQException e) {
                                  LOGGER.error("can't close producer, error: {}", e, e);
                              }
                          }
                      });
              
              
                  }
              
              
                  public static ClientSession getSession() {
                      if (sessionThreadLocal.get() == null) {
                          try {
                              ClientSession hornetQSession = nettyFactory.createSession(true, true);
                              hornetQSession.start();
                              sessionThreadLocal.set(hornetQSession);
                          } catch (Exception e) {
                              LOGGER.error("can't create hornetq session: {}", e, e);
                              throw new RuntimeException(e);
                          }
                      }
                      return sessionThreadLocal.get();
                  }
              
              
                  private ClientProducer getProducer(String queueName) {
                      try {
                          if (producer.get() == null) {
                              ClientProducer clientProducer = getSession().createProducer(queueName);
                              producers.add(clientProducer);
                              producer.set(clientProducer);
                          }
                          return producer.get();
                      } catch (Exception e) {
                          LOGGER.error("can't create queue consumer: {}", e, e);
                          throw new RuntimeException(e);
                      }
                  }
              
              
                  private ClientConsumer getConsumer(String queueName) {
                      try {
                          if (consumer.get() == null) {
                              ClientConsumer clientConsumer = getSession().createConsumer(queueName);
                              consumers.add(clientConsumer);
                              consumer.set(clientConsumer);
                          }
                          return consumer.get();
                      } catch (Exception e) {
                          LOGGER.error("can't create queue consumer: {}", e, e);
                          throw new RuntimeException(e);
                      }
                  }
              
              
                  @Test
                  public void parralelReadAndWrite() throws Exception{
              
              
              
              
                      int NUM_ITER = 5000;
                      int numberOfConsumers = 2;
              
              
                      ExecutorService threadPool = Executors.newFixedThreadPool(10);
              
              
                      final CountDownLatch countDownLatch = new CountDownLatch(NUM_ITER);
              
              
                      long start = System.currentTimeMillis();
              
              
                      for (int i = 0; i < numberOfConsumers; i++) {
                          new Thread(new Runnable() {
                              @Override
                              public void run() {
                                  try {
                                      getConsumer("foundation.myExampleDirect").setMessageHandler(new MessageHandler() {
              
              
                                          @Override
                                          public void onMessage(ClientMessage message) {
                                              countDownLatch.countDown();
                                              long count = countDownLatch.getCount();
                                              if (count % 1000 == 0) {
                                                  System.out.println("[" + Thread.currentThread().getName() + "]. consumer: " + count);
                                              }
                                          }
                                      });
                                  } catch (HornetQException e) {
                                      e.printStackTrace();
                                  }
                              }
                          }).start();
                      }
              
              
              //                        }
              //                    });
              
              
              
              
                      for (int i = 0; i < NUM_ITER; i++) {
                          final int counter = i;
                          threadPool.execute(new Runnable() {
                              @Override
                              public void run() {
                                  ClientMessage clientMessage = getSession().createMessage(true);
                                  clientMessage.getBodyBuffer().writeString("hi there: " + counter);
                                  try {
                                      getProducer("foundation.myExampleDirect").send(clientMessage);
                                  } catch (HornetQException e) {
                                      e.printStackTrace();
                                  }
                                  if (counter % 1000 == 0) {
                                      System.out.println("[" + Thread.currentThread().getName() + "] read/write: index: " + counter);
              
              
                                  }
                              }
                          });
              
              
                      }
              
              
                      countDownLatch.await();
              
              
              //        Thread.sleep(5000);
              
              
                      long end = System.currentTimeMillis();
              
              
                      double total = (end - start) / 1000D;
              
              
                      double tps = NUM_ITER / total;
              
              
                      System.out.println("tps read and write: " + tps);
                  }
              
              • 5. Re: Re: Understanding Clusters from client point of view
                Yair Ogen Expert

                Does anyone have an idea why opening more than one session still connects to the same node?

                1 of 1 people found this helpful
                • 6. Re: Re: Understanding Clusters from client point of view
                  Yong Hao Gao Master

                  I think the problem lies here:

                   

                  nettyFactory = HornetQClient.createServerLocatorWithHA(server1, server2).createSessionFactory();

                   

                  Each time it is called a new server locator is created. Each locator has its own load-balance policy and its load-balance behaviour is independent of every other locators.

                   

                  Howard

                  • 7. Re: Re: Understanding Clusters from client point of view
                    Yair Ogen Expert

                    I've changed the init code to only create the locator, and I create a unique session factory and session for each new thread.

                     

                    I now see both nodes got 5000 messages and none were read. Why is that? I expected to see 1 consumer on each node and messages to be distributed between the nodes.