-
1. Re: Understanding Clusters from client point of view
gaohoward May 4, 2014 7:36 AM (in response to yairogen)Once the session is created it stays with one connection, which means the session is only connected to one of the two node cluster. The round-robin happens when you create multiple sessions. It's easy to verify with your test.
-
2. Re: Understanding Clusters from client point of view
yairogen May 4, 2014 7:41 AM (in response to gaohoward)Thanks. That's what I thought. However, I create 2 sessions using the same connection factory in 2 different threads and they both result in 2 consumers on the same node instead of on consumer on each node as I expected.
What am I missing?
-
3. Re: Understanding Clusters from client point of view
gaohoward May 4, 2014 8:11 AM (in response to yairogen)Can you upload your test?
-
4. Re: Re: Understanding Clusters from client point of view
yairogen May 4, 2014 8:30 AM (in response to gaohoward)Here it is:
private static final ThreadLocal<ClientProducer> producer = new ThreadLocal<ClientProducer>(); private static final ThreadLocal<ClientConsumer> consumer = new ThreadLocal<ClientConsumer>(); private static final Set<ClientProducer> producers = new HashSet<ClientProducer>(); private static final Set<ClientConsumer> consumers = new HashSet<ClientConsumer>(); public static ThreadLocal<ClientSession> sessionThreadLocal = new ThreadLocal<ClientSession>(); private static ClientSessionFactory nettyFactory = null; private static Logger LOGGER = LoggerFactory.getLogger("hornetq-test"); static { init(); } private static void init() { try { // nettyFactory = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName())).createSessionFactory(); Map<String, Object> map = new HashMap<String, Object>(); map.put("host", "localhost"); map.put("port", "5445"); TransportConfiguration server1 = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); HashMap<String, Object> map2 = new HashMap<String, Object>(); map2.put("host", "localhost"); map2.put("port", "6445"); TransportConfiguration server2 = new TransportConfiguration(NettyConnectorFactory.class.getName(), map2); nettyFactory = HornetQClient.createServerLocatorWithHA(server1, server2).createSessionFactory(); } catch (Exception e) { LOGGER.error("can't create hornetq session: {}", e, e); throw new RuntimeException(e); } Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { for (ClientProducer clientProducer : producers) { clientProducer.close(); } for (ClientConsumer clientConsumer : consumers) { clientConsumer.close(); } } catch (HornetQException e) { LOGGER.error("can't close producer, error: {}", e, e); } } }); } public static ClientSession getSession() { if (sessionThreadLocal.get() == null) { try { ClientSession hornetQSession = nettyFactory.createSession(true, true); hornetQSession.start(); sessionThreadLocal.set(hornetQSession); } catch (Exception e) { LOGGER.error("can't create hornetq session: {}", e, e); throw new RuntimeException(e); } } return sessionThreadLocal.get(); } private ClientProducer getProducer(String queueName) { try { if (producer.get() == null) { ClientProducer clientProducer = getSession().createProducer(queueName); producers.add(clientProducer); producer.set(clientProducer); } return producer.get(); } catch (Exception e) { LOGGER.error("can't create queue consumer: {}", e, e); throw new RuntimeException(e); } } private ClientConsumer getConsumer(String queueName) { try { if (consumer.get() == null) { ClientConsumer clientConsumer = getSession().createConsumer(queueName); consumers.add(clientConsumer); consumer.set(clientConsumer); } return consumer.get(); } catch (Exception e) { LOGGER.error("can't create queue consumer: {}", e, e); throw new RuntimeException(e); } } @Test public void parralelReadAndWrite() throws Exception{ int NUM_ITER = 5000; int numberOfConsumers = 2; ExecutorService threadPool = Executors.newFixedThreadPool(10); final CountDownLatch countDownLatch = new CountDownLatch(NUM_ITER); long start = System.currentTimeMillis(); for (int i = 0; i < numberOfConsumers; i++) { new Thread(new Runnable() { @Override public void run() { try { getConsumer("foundation.myExampleDirect").setMessageHandler(new MessageHandler() { @Override public void onMessage(ClientMessage message) { countDownLatch.countDown(); long count = countDownLatch.getCount(); if (count % 1000 == 0) { System.out.println("[" + Thread.currentThread().getName() + "]. consumer: " + count); } } }); } catch (HornetQException e) { e.printStackTrace(); } } }).start(); } // } // }); for (int i = 0; i < NUM_ITER; i++) { final int counter = i; threadPool.execute(new Runnable() { @Override public void run() { ClientMessage clientMessage = getSession().createMessage(true); clientMessage.getBodyBuffer().writeString("hi there: " + counter); try { getProducer("foundation.myExampleDirect").send(clientMessage); } catch (HornetQException e) { e.printStackTrace(); } if (counter % 1000 == 0) { System.out.println("[" + Thread.currentThread().getName() + "] read/write: index: " + counter); } } }); } countDownLatch.await(); // Thread.sleep(5000); long end = System.currentTimeMillis(); double total = (end - start) / 1000D; double tps = NUM_ITER / total; System.out.println("tps read and write: " + tps); }
-
5. Re: Re: Understanding Clusters from client point of view
yairogen May 5, 2014 8:18 AM (in response to yairogen)1 of 1 people found this helpfulDoes anyone have an idea why opening more than one session still connects to the same node?
-
6. Re: Re: Understanding Clusters from client point of view
gaohoward May 5, 2014 8:39 AM (in response to yairogen)I think the problem lies here:
nettyFactory = HornetQClient.createServerLocatorWithHA(server1, server2).createSessionFactory();
Each time it is called a new server locator is created. Each locator has its own load-balance policy and its load-balance behaviour is independent of every other locators.
Howard
-
7. Re: Re: Understanding Clusters from client point of view
yairogen May 5, 2014 9:16 AM (in response to gaohoward)I've changed the init code to only create the locator, and I create a unique session factory and session for each new thread.
I now see both nodes got 5000 messages and none were read. Why is that? I expected to see 1 consumer on each node and messages to be distributed between the nodes.