/* * Copyright (C) 2015 */ package com.acme.messaging.client.tests.cluster; import java.util.HashMap; import java.util.Map; import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.ClientConsumer; import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientProducer; import org.hornetq.api.core.client.ClientSession; import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.HornetQClient; import org.hornetq.api.core.client.ServerLocator; import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertEquals; public class ClusterClientHornetQCoreTest { public static final int NODE00_PORT = 10445; public static final int NODE01_PORT = 11445; public static final String QUEUE_NAME = "ClusterClientTestQueue"; public static final String DOMAIN_NAME = "TestTenant01"; public static final String USER_NAME = "TestAdmin01"; public static final String USER_PASSWORD = "TestPassword"; public static final boolean DURABLE = true; private ClientSessionFactory sf00; private ClientSessionFactory sf01; @Before public void setUp() throws Exception { sf00 = createClientSessionFactory(NODE00_PORT); sf01 = createClientSessionFactory(NODE01_PORT); createQueue(sf00); } @After public void tearDown() { sf00.close(); sf01.close(); } @Test public void testNode00DirectConn() throws Exception { /* 1. connect to node00 (direct connection) 2. send a message via node00 connection 3. connect to node01 (direct connection) 4. receive the message via connection got on step 3 */ try (ClientSession session = createSession(sf00)) { try (ClientProducer p = session.createProducer(getQueueName())) { ClientMessage m = session.createMessage(DURABLE); m.putStringProperty("p", "v"); p.send(m); System.out.println("message [" + m + "] was sent via " + session); } } System.out.println("Wait a second"); Thread.sleep(1000); try (ClientSession session = createSession(sf01)) { try (ClientConsumer c = session.createConsumer(getQueueName())) { System.out.println("Receiving a message via " + session); ClientMessage mr = c.receive(1000); assertEquals("v", mr.getStringProperty("p")); System.out.println("Received message [" + mr + "] via " + session); } } } private ClientSessionFactory createClientSessionFactory(int port) throws Exception { Map map = new HashMap<>(); map.put("host", "localhost"); map.put("port", port); ServerLocator serverLocator = HornetQClient.createServerLocatorWithHA( new TransportConfiguration(NettyConnectorFactory.class.getName(), map)); return serverLocator.createSessionFactory(); } private void createQueue(ClientSessionFactory sf) throws HornetQException { try (ClientSession coreSession = createSession(sf)) { try { coreSession.createQueue(getQueueName(), getQueueName(), DURABLE); } catch (HornetQException e) { System.out.println(e.getType()+" "+e.getMessage()); } } } private ClientSession createSession(ClientSessionFactory sf) throws HornetQException { ClientSession session = sf.createSession(DOMAIN_NAME+"."+USER_NAME, USER_PASSWORD, false, true, true, false, sf.getServerLocator().getAckBatchSize()); session.start(); return session; } private String getQueueName() { return "jms."+DOMAIN_NAME + "." + QUEUE_NAME; } }