14 Replies Latest reply on Sep 6, 2012 12:03 AM by clebert.suconic

    One consumer dropping offline causes an entire broadcast system to freeze up

    gbdev

      I must have things configured incorrectly somewhere (or maybe it's a bug?). HornetQ has worked wonderfully in all respects except for one. I have researched and debugged the problem but I still can't figure it out. Please help...

       

      Below is a description of the problem then a step-by-step guide to reproducing it, along with configuration details and test code.

       

      Description

       

      A producer is broadcasting data to multiple independent consumers via a topic on a remote HornetQ server. Everything works just as expected for a broadcast system, except that if any consumer loses their network connection or suspends their machine then the entire broadcast is halted for everyone. There is no problem if the consumer closes their connections in an orderly manner or if the process is killed unexpectedly. The problem only arises when the consuming machine suddenly goes offline (i.e. because it is suspended or its network is disconnected).

       

      Once that happens, soon the other consumers stop receiving messages and soon after that the producer's call to TopicPublisher.publish() blocks for a long time, then the producer's session closes and in some cases the HornetQ server needs to be forcefully shut down (using "kill -9 <pid>") then restarted before the broadcast can resume. All caused because one consumer amongst many happened to drop offline.

       

      The system freezes up faster when there are larger or more frequent messages, which suggests to me that the server is buffering the messages that are being missed by the absent consumer and when this buffer is full the server blocks the producer from publishing any new data.

       

      I would like to configure it so that it doesn't disrupt the broadcast and instead the server either discards the absent consumer's old buffered data or simply closes the connection with the absent consumer.

       

       

      Step-by-step

       

      On machineA (192.168.1.100 in my case) install hornetq-2.2.14.Final and set it up so that it

      * can handle external connections

      * has a topic "exampleTopic"

      The necessary changes to the default congfig files are listed below.

       

      Start the server.

       

      Compile the test code (also listed below).

      I used eclipse to create two runnable jar files called TopicConsumer.jar and TopicProducer.jar.

       

      Also on machineA open a terminal, cd into the directory with the jar files and run (using your own IP):

      java -jar TopicProducer.jar 192.168.1.100

       

      This will then start reporting on the messages that it is sending. Note: it could be on a different machine to the server but it makes no difference to the test.

       

      Still on machineA open another terminal, cd into the directory with the jar files and run (using your own IP):

      java -jar TopicConsumer.jar 192.168.1.100

       

      This will then start reporting on the messages that it is receiving. Note: this could also be on a different machine to the other processes but it makes no difference to the test.

       

      Now on machineB, copy the TopicConsumer.jar file across, open a terminal, cd into the directory with the jar file and run (using your own IP):

      java -jar TopicConsumer.jar 192.168.1.100

       

      This will then start reporting on the messages that it is receiving from the server on machineA.

       

      Now take machineB offline by suspending it or disabling its network.

       

      Watch the reports from the producer and consumer on machineA.

       

      First the consumer will stop receiving messages however the producer will keep sending new messages.

       

      Then the producer will stop sending messages - it is blocked by the call to TopicPublisher.publish().

       

      Eventually that call returns and a warning is logged by the producer:

       

      Sep 01, 2012 3:37:46 PM org.hornetq.core.logging.impl.JULLogDelegate warn

      WARNING: Connection failure has been detected: Did not receive data from server for org.hornetq.core.remoting.impl.netty.NettyConnection@1eb59fd9[local= /192.168.1.100:45866, remote=/192.168.1.100:5445] [code=3]

       

      From this point on, every time that the producer tries to send a message there is an exception:

       

      javax.jms.IllegalStateException: Session is closed

          at org.hornetq.jms.client.HornetQSession.checkClosed(HornetQSession.java:1008)

          at org.hornetq.jms.client.HornetQSession.createTextMessage(HornetQSession.java:194)

          at suspendtest.TopicProducer.sendMessage(TopicProducer.java:32)

          at suspendtest.TopicProducer.main(TopicProducer.java:68)

       

      Hence because a single consumer went offline, all consumers stopped receiving data and the producer's session was closed.

       

      If the offline consumer stays offline, even if I restart the producer and it reconnects, it only sends a small number of messages (87 to be exact) before it again freezes up and eventually loses its session again - this happens repeatedly. The system is broken until either the offline consumer comes back online or the HornetQ server is restarted. However if the consumer is still offline, then the server won't respond to a ctrl+c or ./stop.sh or even "kill <pid>" - so I have to use "kill -9 <pid>" to forcefully kill the server and then restart it before the producer can reconnect and the broadcast can be restarted.

       

      Other sequences of events can also be observed depending on how long one waits before bringing the consumer back online and whether the producer has lost its session already. If machineB resumes quickly then everything carries on from where it paused (although the entire broadcast was still paused for a while). If the absent consumer comes back online then the system will function again, although I may need to restart the producer (if its session has closed) - then all the consumers receive the new data stream, even the one that went offline. Actually, the returning consumer only picks up the new broadcast if it was suspended, not if its network was disconnected. In the latter case the client end recognises a disconnection and doesn't try to reuse the connection, however when suspended there is no knowledge of the disconnection so upon waking it reuses the old server connection, which still works.

       

       

      Config Files

       

      Note in the following config files replace the IP [192.168.1.100] with the IP of the machine that your HornetQ server will be running on. I have tried this test code both within a LAN and using a remote VPS - the same problem arises.

       

      HornetQHome/config/ra.xml

       

      Edit this file, replacing the existing <config-property> entries with the following (using your own IP).

       

          <config-property>

              <description>The transport type</description>

              <config-property-name>ConnectorClassName</config-property-name>

              <config-property-type>java.lang.String</config-property-type>

              <config-property-value>org.hornetq.integration.transports.netty.NettyConnectorFactory</config-property-value>

          </config-property>

       

            <config-property>

               <description>The transport configuration. These values must be in the form of key=val;key=val;</description>

               <config-property-name>ConnectionParameters</config-property-name>

               <config-property-type>java.lang.String</config-property-type>

               <config-property-value>host=192.168.1.100;port=5445</config-property-value>

            </config-property>       

       

      HornetQHome/config/stand-alone/non-clustered/hornetq-beans.xml

       

      Edit these two line (using your own IP):

            <property name="bindAddress">192.168.1.100</property>

            <property name="rmiBindAddress">192.168.1.100</property>

       

      HornetQHome/config/stand-alone/non-clustered/hornetq-configuration.xml

       

      Edit four lines, changing them from:

               <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>

       

      to (using your own IP):

               <param key="host"  value="192.168.1.100"/>

       

      HornetQHome/config/stand-alone/non-clustered/hornetq-jms.xml

       

      Add:

         <topic name="exampleTopic">

            <entry name="/topic/exampleTopic"/>

         </topic>

       

       

       

      Source Code

       

      TopicClient.Java (common base class)

       

      package suspendtest;

       

      import java.util.Properties;

       

      import javax.jms.JMSException;

      import javax.jms.Topic;

      import javax.jms.TopicConnection;

      import javax.jms.TopicConnectionFactory;

      import javax.jms.TopicSession;

      import javax.naming.Context;

      import javax.naming.InitialContext;

      import javax.naming.NamingException;

       

      public class TopicClient {

          protected Context context = null;

          private TopicConnectionFactory topicConnectionFactory;

          protected Topic topic;

          private TopicConnection topicConnection;

          protected TopicSession topicSession;

       

          public TopicClient(String host) {

              context = getInitialContext(host);

              try {

                  topicConnectionFactory = (TopicConnectionFactory) context.lookup("ConnectionFactory");

                  try {

                      topic = (Topic) context.lookup("topic/exampleTopic");

                      topicConnection = topicConnectionFactory.createTopicConnection();

                      topicSession = topicConnection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);

                      topicConnection.start();

                  } catch (JMSException e) {

                      e.printStackTrace();

                  }

              } catch (NamingException e) {

                  e.printStackTrace();

              }

          }

       

          private final Context getInitialContext(String host) {

              Properties props = new Properties();

              props.setProperty("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");

              props.setProperty("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");

              props.setProperty("java.naming.provider.url", "jnp://" + host + ":1099");

              Context context;

              try {

                  context = new InitialContext(props);

                  return context;

              } catch (NamingException e) {

                  e.printStackTrace();

              }

              return null;

          }

       

          protected void close() {

              try {

                  topicConnection.close();

              } catch (JMSException e) {

                  e.printStackTrace();

              }

              if (context != null) {

                  try {

                      context.close();

                  } catch (NamingException e) {

                      e.printStackTrace();

                  }

              }

          }

       

      }

       

      TopicConsumer.Java

       

      package suspendtest;

       

      import javax.jms.JMSException;

      import javax.jms.Message;

      import javax.jms.MessageListener;

      import javax.jms.TextMessage;

      import javax.jms.TopicSubscriber;

       

       

       

      public class TopicConsumer extends TopicClient implements MessageListener {

          protected boolean isAlive;

          private TopicSubscriber topicSubscriber;

       

          public TopicConsumer(String host) {

              super(host);

              System.out.println("-------- Created TopicConsumer --------");

              try {

                  topicSubscriber = topicSession.createSubscriber(topic);

                  topicSubscriber.setMessageListener(this);

                  isAlive = true;

              } catch (JMSException e) {

                  e.printStackTrace();

              }

          }

       

          @Override

          public void onMessage(Message message) {

              if (message instanceof TextMessage) {

                  try {

                      System.out.println("Received message " + ((TextMessage)message).getText());

                  } catch (JMSException e) {

                      e.printStackTrace();

                  }

              }

              else {

                  System.out.println("Received an untyped mesage as a terminator to a stream of messages.");

                  isAlive = false;

              }

          }

       

          @Override

          public final void close() {

              try {

                  topicSubscriber.close();

              } catch (JMSException e) {

                  e.printStackTrace();

              }

              super.close();

              System.out.println("TopicConsumer closed");

          }

       

          public final boolean isAlive() {

              return isAlive;

          }

       

          /**

           * block and receive messages whilst the consumer isAlive

           */

          public final void receiveMessages() {

              System.out.println("TopicConsumer ready to receive messages...");

              while(isAlive) { // keep receiving messages

                  try {

                      Thread.sleep(100);

                  } catch (InterruptedException e) {

                      break; // interupted when the animating thread is closing

                  }

              }

              close();

          }

       

       

       

          public static void main(String[] args) {

              if (args.length == 1) {

                  TopicConsumer consumer = new TopicConsumer(args[0]);

                  consumer.receiveMessages();

              }

              else {

                  System.out.println("Usage: java -jar TopicConsumer.jar host");

              }

          }

       

      }

       

      TopicProducer.Java

       

      package suspendtest;

       

      import javax.jms.DeliveryMode;

      import javax.jms.JMSException;

      import javax.jms.TopicPublisher;

       

      public class TopicProducer extends TopicClient {

          private TopicPublisher topicPublisher;

       

       

          public TopicProducer(String host) {

              super(host);

              System.out.println("-------- Created TopicProducer --------");

              try {

                  topicPublisher = topicSession.createPublisher(topic);

                  topicPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

              } catch (JMSException e) {

                  e.printStackTrace();

              }

          }

       

          public final void sendMessage(String message) {

              if (message != null) {

                  try {

                      System.out.println("Sending message " + message);

                      topicPublisher.publish(topicSession.createTextMessage(message));

                      System.out.println("Sent message " + message);

                  } catch (JMSException e) {

                      e.printStackTrace();

                  }

              }

              else { // message == null

                  try {

                      // Send an un-typed message indicating end of messages.

                      System.out.println("Sending an untyped mesage as a terminator to a stream of messages.");

                      topicPublisher.publish(topicSession.createMessage());

                  } catch (JMSException e) {

                      e.printStackTrace();

                  }

              }

          }

       

          @Override

          public final void close() {

              try {

                  topicPublisher.close();

              } catch (JMSException e) {

                  e.printStackTrace();

              }

              super.close();

              System.out.println("TopicProducer closed");

          }

       

       

       

       

          public static void main(String[] args) {

              if (args.length == 1) {

                  TopicProducer producer = new TopicProducer(args[0]);

                  for (int i = 0; i < 100000; i++) {

                      producer.sendMessage("(" + i + ") Blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah blah!");

                      try {

                          Thread.sleep(100);

                      } catch (InterruptedException e) {

                          e.printStackTrace();

                      }

                  }

                  producer.sendMessage(null); // terminate the message stream

              }

              else {

                  System.out.println("Usage: java -jar TopicProducer.jar host");

              }

          }

       

      }

       

       

      Regards,

      John

        • 1. Re: One consumer dropping offline causes an entire broadcast system to freeze up
          gbdev

          I just converted the above test case to run on an ActiveMQ server; to compare its behaviour with the HornetQ server.

           

          This is what I found...

          • ActiveMQ buffered the messages; so if the absent consumer returned fairly quickly it received all of the messages that were sent in its absence.
          • However when this buffer was full ActiveMQ discarded the old data in the buffer, so when the absent consumer returned after a prolonged absence it received only the recent messages that were sent in its absence - hence some data was lost during its absence.
          • The broadcast continued uninterupted even when the absent consumer remained offline indefinately.

           

          It would be good if HornetQ did something similar to this instead of blocking the producer from sending new messages.

           

          Regards,

          John

          • 2. Re: One consumer dropping offline causes an entire broadcast system to freeze up
            clebert.suconic

            You should configure the system to page instead of blocking.

            • 3. Re: One consumer dropping offline causes an entire broadcast system to freeze up
              gbdev

              Great, so it's just a weird default setting - you must get sick of people tripping over this issue.

               

              Are you sure that the configuration switch still works?

               

              I just tried changing the config to PAGE and also to DROP (which is what I actually need). Neither change made any difference. Exactly the same errors occurred.

               

              When using the setup described above, the entire broadcast system breaks when a single viewer goes offline, regardless of the address-full-policy that is specified.

               

              Why is that? Is there some other configuration that is required before it will work?

               

              From my perspective it seems that HornetQ comes out-of-the-box broken and I need to be some kind of expert to fix it. I just want a no hassles publish/subscribe broadcasting solution.

               

              Regards,

              John

              • 4. Re: One consumer dropping offline causes an entire broadcast system to freeze up
                clebert.suconic

                You are breaking the connection between the producer and the server? I don't understand how you expect the producer to reconnect?

                 

                You should enable reconnect-attemps if you are re-enabling the connection at some point.

                From my perspective it seems that HornetQ comes out-of-the-box broken and I need to be some kind of expert to fix it. I just want a no hassles publish/subscribe broadcasting solution.

                I don't think it's broken. Users are supposed to consume their messages, and in most cases (the default cases) you just need to consume all the messages.

                 

                 

                I had a wrong udnerstanding about your use-case.

                 

                 

                You should probably attach a runnable example if you still have an issue. (use advanced editor and attach the files).

                • 5. Re: One consumer dropping offline causes an entire broadcast system to freeze up
                  gbdev

                  Thanks for looking into this Clebert,

                   

                  It is such a basic use-case that I am surprised that HornetQ needs special configuration for it and even more surprised that it still seems to be having trouble.

                   

                  Attached is an archive containing an eclipse project with the sources that were listed above. In the 'deploy' directory there are two runnable jar files. You can use them according to the step-by-step instructiuons given above to reproduce the problem.

                  You are breaking the connection between the producer and the server? I don't understand how you expect the producer to reconnect?

                  No, I am not touching the connection between the producer and the server, but still the server closes the producer's session because of the other problems.


                  I deliberately break the connection between a remote consumer and the server.

                  In a broadcast system there can be thousands of consumers, all on remote user-machines that I have no control over.

                  If any of those consumers happens to suspend their machine or disconnect their network whilst the consuming application is running then the entire broadcast is frozen for everyone until either the absent consumer comes back online or the entire broadcast system is restarted.

                   

                  Imagine if the actions of one television viewer could freeze the broadcast for everyone until either that viewer turned their TV back on or the TV station restarted all of its systems.

                  That is what is happening here. That is why I say that the broadcast system is broken.

                   

                  This type of behaviour makes it impossible to create a robust broadcast system because it would constantly be freezing up and needing to be restarted. If the DROP option worked then that would be great, but choosing that option made no difference - the system still froze up just the same. So too with PAGE - everything just freezes up soon after a consumer drops offline.

                   

                  Furthermore, there are no meaningful error messages to give any indication of what is going wrong. Things just freeze up without explanation.

                   

                  The actions of a remote consumer should not have the capacity to break the entire communication system for all consumers and producers. That is the sort of robustness that I need - can HornetQ provde this? If so, how?

                   

                  Regards,

                  John

                  • 6. Re: One consumer dropping offline causes an entire broadcast system to freeze up
                    clebert.suconic

                    You just need to read the manual.. it shouldn't be so bad. Just read the chapter about reconection and the chapter about paging.

                    • 7. Re: One consumer dropping offline causes an entire broadcast system to freeze up
                      gbdev

                      So I need to read a massive manual just so that a basic and obvious use-case will function. That is exactly why I said:

                       

                      HornetQ comes out-of-the-box broken and I need to be some kind of expert to fix it. I just want a no hassles publish/subscribe broadcasting solution.

                       

                      I will use ActiveMQ from now on because at least it functions in a sensible manner and its support forum couldn't be any worse than this.

                       

                      This is the second encounter I have had with the JBoss community where people were terse and unhelpful to the point of being obnoxious.

                       

                      I will recommend that people stay away from HornetQ because of its bizarre and counter intuitive behaviour and its obnoxious support forum.

                      • 8. Re: One consumer dropping offline causes an entire broadcast system to freeze up
                        jbertram

                        Clebert, I remember this kind of thing happening to JBoss Messaging as well.  If I recall correctly when a message arrived on a topic a thread would then deliver that message to each subscriber in a serial fashion (i.e. one by one).  However, if the connection to one of the subscribers was shoddy then the thread delivering the messages would get delayed (i.e. blocked) in the remoting layer waiting for the connection to die (or whatever the case may be).  This would also cause message publishers to block since the thread receiving the message from the publisher is the same one that would deliver the message to the subscribers.

                         

                        I know it is possible to get around the stall for the message publisher by setting direct-deliver to "false" on the connector, but it still seems plausible that delivery of the message to all the subscribers might get delayed.

                         

                        What do you think?

                        • 9. Re: One consumer dropping offline causes an entire broadcast system to freeze up
                          jbertram

                          I think you're being a bit sensitive, and calling others obnoxious isn't exactly helping you get the answers you need.

                           

                          As far as reading the manual, that's pretty standard fare with software.  The manual is not perfect, but it's pretty darn good.  The chapters are clearly marked by topic so you can skip what you don't find relevant.  Have you tried setting direct-deliver to false on the connector your publisher uses?

                           

                          Whether or not HornetQ comes out of the box broken has yet to be established as far as I'm concerned.  More investigation is needed.  Would it be possible for you to gather thread dumps on the server after you have disconnected a client and see the delay in delivering messages to subscribers?  That would help determine what is hung and why.

                           

                          Just to be clear, everyone I know on the forum has a day-job and lends their help with no compensation whatsoever (aside from the warm fuzzies of helping someone out).  What you call terse may simply be time-efficient communication to someone else, but I digress.

                          • 10. Re: One consumer dropping offline causes an entire broadcast system to freeze up
                            clebert.suconic

                            I just told you to read two small chapters on reconnection and paging. that's all.. I didn't ask you to read the whole thing (although it's always a good thing to read the doc of any product you want to use).

                             

                            I always tend to be nice to users... if ActiveMQ gives you what you want.. go ahead.

                             

                            It seems you bumped into a bug.. if you were a bit more helpful we would be able to work together, but so far you have been aggressive.

                             

                             

                            If you still want our help.. we will be here.. no hard feelings...

                             

                             

                            If you want to go with ActiveMQ.. good luck with that as well.

                            • 11. Re: One consumer dropping offline causes an entire broadcast system to freeze up
                              clebert.suconic

                              Just don't expect to be able to use ActiveMQ out of the box without reading the docs

                               

                               

                              For starters, make sure you read the document about persistence configuration, and configure the proper journal file.

                               

                              Or if you want to, my quick advice on ActiveMQ would be:

                               

                               

                              • Rename activemq-throughput.xml as activemq.xml
                                • That will make it run a bit faster and use the proper persistence
                              • set enableJournalDiskSyncs="true" on KahaDB

                               

                              There are other configurations you may want to run into.....

                               

                               

                              - there's no config that fits all on any system.

                              • 12. Re: One consumer dropping offline causes an entire broadcast system to freeze up
                                gbdev

                                Hi Justin,

                                 

                                What was obnoxious about Clebert's responses was not simply their terseness (which is fine on its own), but the fact that his responses only showed that he hadn't even bothered to pay attention to what was said. His advice was useless and only showed that he assumed that I was an idiot who could be brushed off with some irrelevant one liners.

                                 

                                Clebert's advice to "read the manual" is insulting because I have read most of the bloody manual and it doesn't help! I should NOT have to scour every line of some manual looking for hidden "gotchas" and weird incomprehensible configurations for things that any sane person would expect to be default behaviour (and which IS default behaviour in other messaging systems).

                                 

                                Who ever heard of a messaging system that was so fragile by default that just one consumer going offline would break the whole system!!! WTF?

                                And when I try to point this out it is assumed that I am an idiot! WTF!!!

                                 

                                The sections that he refered to were irrelevent to the problem that I was describing and he is just wasting my time because he can't even be bothered to pay attention to what I was saying!!!

                                 

                                I provided more than enough information to re-create the problem in the first posting but if you lot are too in denial about it out then tough luck.

                                 

                                I have wasted enough time on HornetQ - this stupid bug has set our development schedule back by more than two weeks.

                                 

                                I have all the answers that I need regarding HornetQ - "stay the hell away from it"...

                                 

                                I will be writing about my experiences to warn others and then I will forget all about HornetQ and get on with using a messaging system that is robust and sensible rather than fragile and bizarre. From my research online, I am far from being the only one who has been stung by HornetQ...

                                 

                                goodbye

                                • 13. Re: One consumer dropping offline causes an entire broadcast system to freeze up
                                  gbdev

                                  Clebert Suconic wrote:

                                   

                                  It seems you bumped into a bug.. if you were a bit more helpful we would be able to work together, but so far you have been aggressive.

                                   

                                  Ah, so you finally admit that there is a bug! About time - but too late for me.

                                  I was very willing to help resolve this but it was your denial and brush-offs that made me have to push.

                                  Sorry if you see that merely as 'aggressive'.

                                  For the sake of others I hope you lot eventually fix the bug - but I am getting out of here...

                                  • 14. Re: One consumer dropping offline causes an entire broadcast system to freeze up
                                    clebert.suconic

                                    I don't know if you had bugs.. I was just trying to help you... Any software may have bugs. You were being aggressive since post 2.

                                     

                                    I'm closing this thread now.. Good luck on finding help with ActiveMQ... We are professionals here and we don't have time for this.