Not all of messages are delivered in symmetric HornetQ cluster
andrey.vorobiev Nov 2, 2011 10:14 AMI have following symmetric cluster configuration:
hornetq-configuration.xml:
<connectors> <connector name="netty"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="host" value="10.0.1.32"/> <param key="port" value="1254"/> </connector> <connector name="node2-connector"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="host" value="10.0.2.199"/> <param key="port" value="1254"/> </connector> <connectors/> <cluster-connections> <cluster-connection name="my-cluster"> <address>jms</address> <connector-ref>netty</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> <forward-when-no-consumers>true</forward-when-no-consumers> <max-hops>1</max-hops> <static-connectors> <connector-ref>node2-connector</connector-ref> </static-connectors> </cluster-connection> </cluster-connections> <address-settings> <!--default for catch all--> <address-setting match="#"> <dead-letter-address>jms.queue.DLQ</dead-letter-address> <expiry-address>jms.queue.ExpiryQueue</expiry-address> <redelivery-delay>0</redelivery-delay> <redistribution-delay>0</redistribution-delay> <max-size-bytes>10485760</max-size-bytes> <message-counter-history-day-limit>10</message-counter-history-day-limit> <address-full-policy>BLOCK</address-full-policy> </address-setting> </address-settings>
hornetq-jms.xml
<connection-factory name="remote-cf"> <xa>true</xa> <ha>true</ha> <connectors> <connector-ref connector-name="netty"/> <connector-ref connector-name="node2-connector"/> </connectors> <entries> <entry name="jms/remote-cf"/> </entries> </connection-factory> <queue name="FakeQueue"> <entry name="jms/fakequeue"/> </queue>
Second node is configured in the same way.
Message producer code:
public class Main { private static final String prefix = UUID.randomUUID().toString(); private static AtomicInteger counter = new AtomicInteger(); public static InitialContext getInitialContext() throws Exception { Properties properties = new Properties(); properties.put(INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); properties.put(PROVIDER_URL, "jnp://10.0.1.32:1299"); properties.put(URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); return new InitialContext(properties); } public static ConnectionFactory getConnectionFactory() throws Exception { return (ConnectionFactory) getInitialContext().lookup("jms/remote-cf"); } public static Destination getDestination() throws Exception { return (Destination) getInitialContext().lookup("jms/fakequeue"); } public static void main(String[] args) throws Exception { ConnectionFactory cf = getConnectionFactory(); for (int i = 0; i < 10; i++) { Connection connection = cf.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); String ip = getIpBy(getRemoteAddress(session)); MessageProducer producer = session.createProducer(getDestination()); producer.setDeliveryMode(DeliveryMode.PERSISTENT); TextMessage message = session.createTextMessage("Message " + prefix + "-" + counter.incrementAndGet()); System.out.println("Sending " + message.getText() + " to " + ip); producer.send(message); connection.close(); Thread.sleep(1000); } } public static String getRemoteAddress(Session session) { return ((DelegatingSession) ((HornetQSession) session).getCoreSession()).getConnection().getRemoteAddress(); } public static String getIpBy(String address) { return address.split(":")[0].replace("/", ""); } }
Message consumer code:
public class Subscriber { public static InitialContext getInitialContext() throws Exception { Properties properties = new Properties(); properties.put(INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); properties.put(PROVIDER_URL, "jnp://10.0.1.32:1299"); properties.put(URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); return new InitialContext(properties); } public static ConnectionFactory getConnectionFactory() throws Exception { return (ConnectionFactory) getInitialContext().lookup("jms/remote-cf"); } public static Destination getDestination() throws Exception { return (Destination) getInitialContext().lookup("jms/fakequeue"); } public static void main(String[] args) throws Exception { ConnectionFactory cf = getConnectionFactory(); Connection connection = cf.createConnection(); connection.start(); connection.setExceptionListener(new ExceptionListener() { public void onException(JMSException exception) { exception.printStackTrace(); } }); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); System.out.println(getRemoteAddress(session)); MessageConsumer consumer = session.createConsumer(getDestination()); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { try { TextMessage m = (TextMessage) message; System.out.println("message " + m.getText()); } catch (Exception e) { e.printStackTrace(); } } }); Thread.sleep(5 * 60 * 1000); } public static String getRemoteAddress(Session session) { return ((DelegatingSession) ((HornetQSession) session).getCoreSession()).getConnection().getRemoteAddress(); } }
Expected behavior: all messages sent by producer will be received by consumer (it doesn't have any matter which node consumer is connected to).
But as a result I get the following:
Sender:
Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-1 to 10.0.2.199 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-2 to 10.0.1.32 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-3 to 10.0.2.199 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-4 to 10.0.1.32 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-5 to 10.0.2.199 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-6 to 10.0.1.32 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-7 to 10.0.2.199 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-8 to 10.0.1.32 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-9 to 10.0.2.199 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-10 to 10.0.1.32
Consumer:
/10.0.1.32:1254 message Message 303b5854-0806-40eb-86a3-e3a92eafdade-2 message Message 303b5854-0806-40eb-86a3-e3a92eafdade-3 message Message 303b5854-0806-40eb-86a3-e3a92eafdade-6 message Message 303b5854-0806-40eb-86a3-e3a92eafdade-7 message Message 303b5854-0806-40eb-86a3-e3a92eafdade-10
So the question: where are other five messages?:)