One consumer dropping offline causes an entire broadcast system to freeze up
gbdev Sep 1, 2012 6:40 AMI must have things configured incorrectly somewhere (or maybe it's a bug?). HornetQ has worked wonderfully in all respects except for one. I have researched and debugged the problem but I still can't figure it out. Please help...
Below is a description of the problem then a step-by-step guide to reproducing it, along with configuration details and test code.
Description
A producer is broadcasting data to multiple independent consumers via a topic on a remote HornetQ server. Everything works just as expected for a broadcast system, except that if any consumer loses their network connection or suspends their machine then the entire broadcast is halted for everyone. There is no problem if the consumer closes their connections in an orderly manner or if the process is killed unexpectedly. The problem only arises when the consuming machine suddenly goes offline (i.e. because it is suspended or its network is disconnected).
Once that happens, soon the other consumers stop receiving messages and soon after that the producer's call to TopicPublisher.publish() blocks for a long time, then the producer's session closes and in some cases the HornetQ server needs to be forcefully shut down (using "kill -9 <pid>") then restarted before the broadcast can resume. All caused because one consumer amongst many happened to drop offline.
The system freezes up faster when there are larger or more frequent messages, which suggests to me that the server is buffering the messages that are being missed by the absent consumer and when this buffer is full the server blocks the producer from publishing any new data.
I would like to configure it so that it doesn't disrupt the broadcast and instead the server either discards the absent consumer's old buffered data or simply closes the connection with the absent consumer.
Step-by-step
On machineA (192.168.1.100 in my case) install hornetq-2.2.14.Final and set it up so that it
* can handle external connections
* has a topic "exampleTopic"
The necessary changes to the default congfig files are listed below.
Start the server.
Compile the test code (also listed below).
I used eclipse to create two runnable jar files called TopicConsumer.jar and TopicProducer.jar.
Also on machineA open a terminal, cd into the directory with the jar files and run (using your own IP):
java -jar TopicProducer.jar 192.168.1.100
This will then start reporting on the messages that it is sending. Note: it could be on a different machine to the server but it makes no difference to the test.
Still on machineA open another terminal, cd into the directory with the jar files and run (using your own IP):
java -jar TopicConsumer.jar 192.168.1.100
This will then start reporting on the messages that it is receiving. Note: this could also be on a different machine to the other processes but it makes no difference to the test.
Now on machineB, copy the TopicConsumer.jar file across, open a terminal, cd into the directory with the jar file and run (using your own IP):
java -jar TopicConsumer.jar 192.168.1.100
This will then start reporting on the messages that it is receiving from the server on machineA.
Now take machineB offline by suspending it or disabling its network.
Watch the reports from the producer and consumer on machineA.
First the consumer will stop receiving messages however the producer will keep sending new messages.
Then the producer will stop sending messages - it is blocked by the call to TopicPublisher.publish().
Eventually that call returns and a warning is logged by the producer:
Sep 01, 2012 3:37:46 PM org.hornetq.core.logging.impl.JULLogDelegate warn
WARNING: Connection failure has been detected: Did not receive data from server for org.hornetq.core.remoting.impl.netty.NettyConnection@1eb59fd9[local= /192.168.1.100:45866, remote=/192.168.1.100:5445] [code=3]
From this point on, every time that the producer tries to send a message there is an exception:
javax.jms.IllegalStateException: Session is closed
at org.hornetq.jms.client.HornetQSession.checkClosed(HornetQSession.java:1008)
at org.hornetq.jms.client.HornetQSession.createTextMessage(HornetQSession.java:194)
at suspendtest.TopicProducer.sendMessage(TopicProducer.java:32)
at suspendtest.TopicProducer.main(TopicProducer.java:68)
Hence because a single consumer went offline, all consumers stopped receiving data and the producer's session was closed.
If the offline consumer stays offline, even if I restart the producer and it reconnects, it only sends a small number of messages (87 to be exact) before it again freezes up and eventually loses its session again - this happens repeatedly. The system is broken until either the offline consumer comes back online or the HornetQ server is restarted. However if the consumer is still offline, then the server won't respond to a ctrl+c or ./stop.sh or even "kill <pid>" - so I have to use "kill -9 <pid>" to forcefully kill the server and then restart it before the producer can reconnect and the broadcast can be restarted.
Other sequences of events can also be observed depending on how long one waits before bringing the consumer back online and whether the producer has lost its session already. If machineB resumes quickly then everything carries on from where it paused (although the entire broadcast was still paused for a while). If the absent consumer comes back online then the system will function again, although I may need to restart the producer (if its session has closed) - then all the consumers receive the new data stream, even the one that went offline. Actually, the returning consumer only picks up the new broadcast if it was suspended, not if its network was disconnected. In the latter case the client end recognises a disconnection and doesn't try to reuse the connection, however when suspended there is no knowledge of the disconnection so upon waking it reuses the old server connection, which still works.
Config Files
Note in the following config files replace the IP [192.168.1.100] with the IP of the machine that your HornetQ server will be running on. I have tried this test code both within a LAN and using a remote VPS - the same problem arises.
HornetQHome/config/ra.xml
Edit this file, replacing the existing <config-property> entries with the following (using your own IP).
<config-property>
<description>The transport type</description>
<config-property-name>ConnectorClassName</config-property-name>
<config-property-type>java.lang.String</config-property-type>
<config-property-value>org.hornetq.integration.transports.netty.NettyConnectorFactory</config-property-value>
</config-property>
<config-property>
<description>The transport configuration. These values must be in the form of key=val;key=val;</description>
<config-property-name>ConnectionParameters</config-property-name>
<config-property-type>java.lang.String</config-property-type>
<config-property-value>host=192.168.1.100;port=5445</config-property-value>
</config-property>
HornetQHome/config/stand-alone/non-clustered/hornetq-beans.xml
Edit these two line (using your own IP):
<property name="bindAddress">192.168.1.100</property>
<property name="rmiBindAddress">192.168.1.100</property>
HornetQHome/config/stand-alone/non-clustered/hornetq-configuration.xml
Edit four lines, changing them from:
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
to (using your own IP):
<param key="host" value="192.168.1.100"/>
HornetQHome/config/stand-alone/non-clustered/hornetq-jms.xml
Add:
<topic name="exampleTopic">
<entry name="/topic/exampleTopic"/>
</topic>
Source Code
TopicClient.Java (common base class)
package suspendtest;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class TopicClient {
protected Context context = null;
private TopicConnectionFactory topicConnectionFactory;
protected Topic topic;
private TopicConnection topicConnection;
protected TopicSession topicSession;
public TopicClient(String host) {
context = getInitialContext(host);
try {
topicConnectionFactory = (TopicConnectionFactory) context.lookup("ConnectionFactory");
try {
topic = (Topic) context.lookup("topic/exampleTopic");
topicConnection = topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
topicConnection.start();
} catch (JMSException e) {
e.printStackTrace();
}
} catch (NamingException e) {
e.printStackTrace();
}
}
private final Context getInitialContext(String host) {
Properties props = new Properties();
props.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
props.setProperty("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
props.setProperty("java.naming.provider.url", "jnp://" + host + ":1099");
Context context;
try {
context = new InitialContext(props);
return context;
} catch (NamingException e) {
e.printStackTrace();
}
return null;
}
protected void close() {
try {
topicConnection.close();
} catch (JMSException e) {
e.printStackTrace();
}
if (context != null) {
try {
context.close();
} catch (NamingException e) {
e.printStackTrace();
}
}
}
}
TopicConsumer.Java
package suspendtest;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
public class TopicConsumer extends TopicClient implements MessageListener {
protected boolean isAlive;
private TopicSubscriber topicSubscriber;
public TopicConsumer(String host) {
super(host);
System.out.println("-------- Created TopicConsumer --------");
try {
topicSubscriber = topicSession.createSubscriber(topic);
topicSubscriber.setMessageListener(this);
isAlive = true;
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println("Received message " + ((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
else {
System.out.println("Received an untyped mesage as a terminator to a stream of messages.");
isAlive = false;
}
}
@Override
public final void close() {
try {
topicSubscriber.close();
} catch (JMSException e) {
e.printStackTrace();
}
super.close();
System.out.println("TopicConsumer closed");
}
public final boolean isAlive() {
return isAlive;
}
/**
* block and receive messages whilst the consumer isAlive
*/
public final void receiveMessages() {
System.out.println("TopicConsumer ready to receive messages...");
while(isAlive) { // keep receiving messages
try {
Thread.sleep(100);
} catch (InterruptedException e) {
break; // interupted when the animating thread is closing
}
}
close();
}
public static void main(String[] args) {
if (args.length == 1) {
TopicConsumer consumer = new TopicConsumer(args[0]);
consumer.receiveMessages();
}
else {
System.out.println("Usage: java -jar TopicConsumer.jar host");
}
}
}
TopicProducer.Java
package suspendtest;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.TopicPublisher;
public class TopicProducer extends TopicClient {
private TopicPublisher topicPublisher;
public TopicProducer(String host) {
super(host);
System.out.println("-------- Created TopicProducer --------");
try {
topicPublisher = topicSession.createPublisher(topic);
topicPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
} catch (JMSException e) {
e.printStackTrace();
}
}
public final void sendMessage(String message) {
if (message != null) {
try {
System.out.println("Sending message " + message);
topicPublisher.publish(topicSession.createTextMessage(message));
System.out.println("Sent message " + message);
} catch (JMSException e) {
e.printStackTrace();
}
}
else { // message == null
try {
// Send an un-typed message indicating end of messages.
System.out.println("Sending an untyped mesage as a terminator to a stream of messages.");
topicPublisher.publish(topicSession.createMessage());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@Override
public final void close() {
try {
topicPublisher.close();
} catch (JMSException e) {
e.printStackTrace();
}
super.close();
System.out.println("TopicProducer closed");
}
public static void main(String[] args) {
if (args.length == 1) {
TopicProducer producer = new TopicProducer(args[0]);
for (int i = 0; i < 100000; i++) {
producer.sendMessage("(" + i + ") Blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah!");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
producer.sendMessage(null); // terminate the message stream
}
else {
System.out.println("Usage: java -jar TopicProducer.jar host");
}
}
}
Regards,
John