Not all of messages are delivered in symmetric HornetQ cluster
andrey.vorobiev Nov 2, 2011 10:14 AMI have following symmetric cluster configuration:
hornetq-configuration.xml:
<connectors> <connector name="netty"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="host" value="10.0.1.32"/> <param key="port" value="1254"/> </connector> <connector name="node2-connector"> <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> <param key="host" value="10.0.2.199"/> <param key="port" value="1254"/> </connector> <connectors/> <cluster-connections> <cluster-connection name="my-cluster"> <address>jms</address> <connector-ref>netty</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> <forward-when-no-consumers>true</forward-when-no-consumers> <max-hops>1</max-hops> <static-connectors> <connector-ref>node2-connector</connector-ref> </static-connectors> </cluster-connection> </cluster-connections> <address-settings> <!--default for catch all--> <address-setting match="#"> <dead-letter-address>jms.queue.DLQ</dead-letter-address> <expiry-address>jms.queue.ExpiryQueue</expiry-address> <redelivery-delay>0</redelivery-delay> <redistribution-delay>0</redistribution-delay> <max-size-bytes>10485760</max-size-bytes> <message-counter-history-day-limit>10</message-counter-history-day-limit> <address-full-policy>BLOCK</address-full-policy> </address-setting> </address-settings>
hornetq-jms.xml
<connection-factory name="remote-cf"> <xa>true</xa> <ha>true</ha> <connectors> <connector-ref connector-name="netty"/> <connector-ref connector-name="node2-connector"/> </connectors> <entries> <entry name="jms/remote-cf"/> </entries> </connection-factory> <queue name="FakeQueue"> <entry name="jms/fakequeue"/> </queue>
Second node is configured in the same way.
Message producer code:
public class Main
{
private static final String prefix = UUID.randomUUID().toString();
private static AtomicInteger counter = new AtomicInteger();
public static InitialContext getInitialContext() throws Exception
{
Properties properties = new Properties();
properties.put(INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
properties.put(PROVIDER_URL, "jnp://10.0.1.32:1299");
properties.put(URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
return new InitialContext(properties);
}
public static ConnectionFactory getConnectionFactory() throws Exception
{
return (ConnectionFactory) getInitialContext().lookup("jms/remote-cf");
}
public static Destination getDestination() throws Exception
{
return (Destination) getInitialContext().lookup("jms/fakequeue");
}
public static void main(String[] args) throws Exception
{
ConnectionFactory cf = getConnectionFactory();
for (int i = 0; i < 10; i++)
{
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String ip = getIpBy(getRemoteAddress(session));
MessageProducer producer = session.createProducer(getDestination());
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage("Message " + prefix + "-" + counter.incrementAndGet());
System.out.println("Sending " + message.getText() + " to " + ip);
producer.send(message);
connection.close();
Thread.sleep(1000);
}
}
public static String getRemoteAddress(Session session)
{
return ((DelegatingSession) ((HornetQSession) session).getCoreSession()).getConnection().getRemoteAddress();
}
public static String getIpBy(String address)
{
return address.split(":")[0].replace("/", "");
}
}
Message consumer code:
public class Subscriber
{
public static InitialContext getInitialContext() throws Exception
{
Properties properties = new Properties();
properties.put(INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
properties.put(PROVIDER_URL, "jnp://10.0.1.32:1299");
properties.put(URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
return new InitialContext(properties);
}
public static ConnectionFactory getConnectionFactory() throws Exception
{
return (ConnectionFactory) getInitialContext().lookup("jms/remote-cf");
}
public static Destination getDestination() throws Exception
{
return (Destination) getInitialContext().lookup("jms/fakequeue");
}
public static void main(String[] args) throws Exception
{
ConnectionFactory cf = getConnectionFactory();
Connection connection = cf.createConnection();
connection.start();
connection.setExceptionListener(new ExceptionListener()
{
public void onException(JMSException exception)
{
exception.printStackTrace();
}
});
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
System.out.println(getRemoteAddress(session));
MessageConsumer consumer = session.createConsumer(getDestination());
consumer.setMessageListener(new MessageListener()
{
public void onMessage(Message message)
{
try
{
TextMessage m = (TextMessage) message;
System.out.println("message " + m.getText());
}
catch (Exception e)
{
e.printStackTrace();
}
}
});
Thread.sleep(5 * 60 * 1000);
}
public static String getRemoteAddress(Session session)
{
return ((DelegatingSession) ((HornetQSession) session).getCoreSession()).getConnection().getRemoteAddress();
}
}
Expected behavior: all messages sent by producer will be received by consumer (it doesn't have any matter which node consumer is connected to).
But as a result I get the following:
Sender:
Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-1 to 10.0.2.199 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-2 to 10.0.1.32 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-3 to 10.0.2.199 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-4 to 10.0.1.32 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-5 to 10.0.2.199 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-6 to 10.0.1.32 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-7 to 10.0.2.199 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-8 to 10.0.1.32 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-9 to 10.0.2.199 Sending Message 303b5854-0806-40eb-86a3-e3a92eafdade-10 to 10.0.1.32
Consumer:
/10.0.1.32:1254 message Message 303b5854-0806-40eb-86a3-e3a92eafdade-2 message Message 303b5854-0806-40eb-86a3-e3a92eafdade-3 message Message 303b5854-0806-40eb-86a3-e3a92eafdade-6 message Message 303b5854-0806-40eb-86a3-e3a92eafdade-7 message Message 303b5854-0806-40eb-86a3-e3a92eafdade-10
So the question: where are other five messages?:)