12 Replies Latest reply on Sep 26, 2012 9:05 PM by clebert.suconic

    QueueImpl memory leak

    mackerman

      Hi, we have a dual server JBoss 7.1.1.Final system, using hornetq 2.2.14.Final for inter-server communication and event notification.  The servers are standalone servers and the application takes care of broadcasting messages to the same topic running on each server.

       

      After the system has been running and processing messages for a while, a heap dump shows that both servers have accumulated quite a number of org.hornetq.core.server.impl.QueueImpl objects, up to thousands if it has been running for a while, taking up to 500M.  They all appear to be duplicates of the topic. Does anyone have any idea what is going on here?  Could it have anything to do with <connection-ttl>120000</connection-ttl>?

       

      Upon initialization, I observe that one queue (/queue/interServerQueue) is instantated, and 2 topics (/topic/asyncEvents).  The second topic instance is created as a result of a CREATE_QUEUE packet.  Is this expected behavior?  I have been unable thus far to determine how all the other instances are created.

       

      our horneq-jms.xml :

       

      <configuration xmlns="urn:hornetq"

                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                     xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">

       

       

                <connection-factory name="ConnectionFactory">

                          <connectors>

                                    <connector-ref connector-name="in-vm" />

                          </connectors>

                          <entries>

                                    <entry name="ConnectionFactory" />

                          </entries>

       

                          <connection-ttl>120000</connection-ttl>

                          <client-failure-check-period>15000</client-failure-check-period>

                </connection-factory>

                    

          <topic name="asyncEvents">

              <entry name="/topic/asyncEvents"/>

          </topic>

         

          <queue name="interServerQueue">

              <entry name="/queue/interServerQueue"/>

              <durable>false</durable>

          </queue>

           

      </configuration>

       

      hornetq-configuration.xml:

       

      <configuration xmlns="urn:hornetq"

                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

       

       

                <persistence-enabled>false</persistence-enabled>

       

       

                <connectors>

                          <connector name="in-vm">

                                    <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory

                                    </factory-class>

                          </connector>

                          <connector name="netty-connector">

                                    <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory

                                    </factory-class>

                                    <param key="port" value="5445" />

                          </connector>

                </connectors>

       

       

                <!-- Acceptors -->

                <acceptors>

                          <acceptor name="in-vm">

                                    <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory

                                    </factory-class>

                          </acceptor>

                          <acceptor name="netty-acceptor">

                                    <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory

                                    </factory-class>

                                    <param key="port" value="5445" />

                                    <param key="host" value="${build.acceptor.address}" />

                          </acceptor>

                          <acceptor name="stomp-acceptor">

                                    <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory

                                    </factory-class>

                                    <param key="protocol" value="stomp" />

                                    <param key="port" value="61613" />

                                    <param key="host" value="${build.acceptor.address}" />

                          </acceptor>

                 </acceptors>

         

                <security-settings>

                          <!--security for example queue -->

                          <security-setting match="#">

                                    <permission type="createDurableQueue" roles="guest" />

                                    <permission type="deleteDurableQueue" roles="guest" />

                                    <permission type="createNonDurableQueue" roles="guest" />

                                    <permission type="deleteNonDurableQueue" roles="guest" />

                                    <permission type="consume" roles="guest" />

                                    <permission type="send" roles="guest" />

                          </security-setting>

                </security-settings>

       

       

      </configuration>

       

       

      Connections, Sessions and Producers are created like this:

       

      HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(getFactoryType(), getTransportConfiguration());

      connection = cf.createConnection();

      connection.start();

      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      destination = HornetQDestination.fromAddress(destinationName);

      producer = session.createProducer(destination);

       

       

      // set time to live for topics, queued messages should all be consumed

      if (JMSFactoryType.TOPIC_CF.equals(getFactoryType()))

                producer.setTimeToLive(30000);

       

       

      producer.setDisableMessageID(true);

      producer.setDisableMessageTimestamp(true);

       

      Messages are sent like this:

       

      TextMessage message = session.createTextMessage();

      message.setStringProperty(org.hornetq.rest.HttpHeaderProperty.CONTENT_TYPE,

                          MediaType.APPLICATION_JSON);

      message.setText(strmsg);

      message.setStringProperty(propName, propValue);

       

       

      try {

                producer.send(message);

                if (session.getTransacted())

                          session.commit();

      } catch (Exception e) {

                log.error(...);

      }

       

      The system runs on CentOS 6.3, JBoss 7.1.1.Final, HornetQ 2.2.14.Final, JRE 1.6.0_30.

       

      Thanks, Mitchell

        • 1. Re: QueueImpl memory leak
          clebert.suconic

          Are you using temporary topics? There's been a fix about TTL not removing temporary topics.

           

           

          HORNETQ-1020 Fix leak: TemporaryTopic must use temporary-queues..

           

          https://github.com/hornetq/hornetq/commit/3339253582217ae67e32b04d693edb1ddfca3622

          • 2. Re: QueueImpl memory leak
            clebert.suconic

            Now, if you are creating many topics anyway.. then it's not a leak, right?

            • 3. Re: QueueImpl memory leak
              mackerman

              We are not using temporary topics.  See above for our full configuration and how we create topics:

               

              HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(getFactoryType(), getTransportConfiguration()); // transport configuration is just a Netty transport configuration: {port=5445, host=10.2.62.71}

              connection = cf.createConnection();

              connection.start();

              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

              destination = HornetQDestination.fromAddress(destinationName);  // where destination name for topic is "jms.topic.asyncEvents"

              producer = session.createProducer(destination);

               

              As I understand it, which of course may not be correct, is that we have a QueueImpl for the InVM connector, and a QueueImpl for the NettyConnector to the (single) other server, so we should only have 2 QueueImpls for the topic.

               

              thanks, Mitchell

              • 4. Re: QueueImpl memory leak
                clebert.suconic

                destination = HornetQDestination.fromAddress(destinationName);  // where destination name for topic is "jms.topic.asyncEvents"

                 

                 

                This doesn't create QueueImpl.. it only creates the HornetQQueue object.

                 

                 

                You are creating queues somewhere else in your code. look for Subscriptions. Each topic subscription will be a QueueImpl.

                • 5. Re: QueueImpl memory leak
                  mackerman

                  I think the problem has to do with the server timing out a connection due to the connection_ttl setting, and the client re-establishing a connection.  We are seeing many errors in the log like

                   

                  WARNING: Connection failure has been detected: Did not receive data from /127.0.0.1:59024. It is likely the client has exited or crashed without closing its connection, or the network between the server and client has failed. You also might have configured connection-ttl and client-failure-check-period incorrectly. Please check user manual for more information. The connection will now be closed. [code=3]

                   

                  This does not explain, however, why the QueueImpl instances are not getting cleaned up.

                   

                  I'm going to implement a keep-alive mechanism, whereby i'm going to publish a message every minute (ttl set at 2 mins) so that the client and server exchange data.  Hopefully that'll be an effective keep-alive approach until we can adopt STOMP 1.1. 

                   

                  I think the same problem occurs with Netty (the above warning was due to a netty client).  It should already have a keep-alive mechanism, but a quick heapdump shows that there are more than expected QueueImpl instances, unless they have just not been cleaned up yet.

                  • 6. Re: QueueImpl memory leak
                    clebert.suconic

                    That's if you were using Temporary Queue or Temporary Topic. That's the only cases where these Queues would be leaking and that's been fixed on later releases.

                    • 7. Re: QueueImpl memory leak
                      clebert.suconic

                      Are you using Stomp?  How are you creating the subscriptions or queues?

                      • 8. Re: QueueImpl memory leak
                        mackerman

                        We don't appear to be leaking QueueImpl instances for our queue, only our topic.  We are not creating a temporary topic, please see above for our configuration. 

                         

                        The topic subscription is being created using a STOMP client, based on Apache.NMS.STOMP, or a test JMS (netty) client.  With either client, it is just started and left running.  The JMS client is as simple receiver, which just prints out the message:

                         

                         

                        import java.util.HashMap;

                         

                        import javax.jms.Connection;

                        import javax.jms.Destination;

                        import javax.jms.JMSException;

                        import javax.jms.Message;

                        import javax.jms.MessageConsumer;

                        import javax.jms.MessageListener;

                        import javax.jms.Session;

                        import javax.jms.TextMessage;

                         

                         

                        import org.apache.log4j.Logger;

                        import org.hornetq.api.core.TransportConfiguration;

                        import org.hornetq.api.jms.HornetQJMSClient;

                        import org.hornetq.api.jms.JMSFactoryType;

                        import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;

                        import org.hornetq.jms.client.HornetQConnectionFactory;

                        import org.hornetq.jms.client.HornetQDestination;

                         

                         

                        public class JmsReceive {

                         

                         

                                  private static Logger log = Logger.getLogger(JmsReceive.class);

                         

                         

                                  private Connection conn;

                         

                                  public static void main(String[] args) throws Exception {

                                            String address = null;

                                            String port = null;

                                            if (args.length == 0)

                                                      address = "127.0.0.1";

                                            else {

                                                      address = args[0];

                                            }

                                            if (args.length < 2)

                                                      port = "5445";

                                            else {

                                                      port = args[1];

                                            }

                         

                                            new JmsReceive().init(address, port);

                         

                                            Thread.sleep(Long.MAX_VALUE);

                                  }

                         

                         

                                  public void init(String address, String port) {

                                            try {

                                                      System.out.println(String.format("connecting to %s, port %s", address, port));

                                                      System.out.println("Waiting for messages...");

                                                      HashMap<String, Object> map = new HashMap<String, Object>();

                                                      map.put("host", address);

                                                      map.put("port", port);

                         

                                                      TransportConfiguration serverTransportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);

                                                      HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.TOPIC_CF, serverTransportConfiguration);

                         

                          cf.setReconnectAttempts(-1);//-1 should mean infinite//otherwise change back to ~10,000

                                                      cf.setRetryInterval(10000);//if fails to connect, retries every 10 sec

                          cf.setClientFailureCheckPeriod(10000);//checks for conn failure every 10 sec

                                                      cf.setInitialConnectAttempts(10000);//set initial connect attempts

                         

                         

                                                      conn = cf.createConnection();

                                                      conn.start();

                         

                                                      Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

                                                      Destination destination = HornetQDestination.fromAddress("jms.topic.asyncEvents");

                         

                                                      MessageConsumer consumer = session.createConsumer(destination);

                                                      consumer.setMessageListener(new MessageListener() {

                                                                @Override

                                                                public void onMessage(Message message) {

                                                                          try {

                                                                                    System.out.println(((TextMessage)message).getText());

                         

                                                                          } catch (Throwable e) {

                                                                                    log.error(e);

                                                                          }

                                                                }

                                                      });

                                            } catch (Throwable t) {

                                                      log.error(t);

                                                      close();

                                            }

                                  }

                                  public void close() {

                                            try {

                                                      if (conn != null) {

                                                                conn.close();

                                                      }

                                                      System.out.println("closing time");

                                            } catch (JMSException e) {

                                                      log.error(e);

                                            }

                                  }

                         

                         

                        }

                        • 9. Re: QueueImpl memory leak
                          clebert.suconic

                          Each time you create a new consumer on a queue, you are creating a new QueueImpl, since you are creating a new non-durable subscription.

                           

                           

                          I believe that got fixed as well.

                           

                           

                          You could work around the issue by making your subscription durable. (createDurableSubscription).

                           

                           

                          Can you test a latest version from github?

                          • 10. Re: QueueImpl memory leak
                            mackerman

                            Understood about a new Consumer creating a QueueImpl.  Our clients only create the subscription once, which is what makes this confusing.

                             

                            I'll try a durableSubscription.

                             

                            I'll try the latest from github, may take a little while.  Are you referring to the latest 2.2.14?  At this point we can't update to 2.3.0.Alpha.

                            • 11. Re: QueueImpl memory leak
                              clebert.suconic

                              git clone git://github.com/hornetq/hornetq.git

                              cd hornetq

                               

                              git checkout Branch_2_2_AS7

                               

                              ./build.sh

                              (./mac-build.sh if you have a mac)

                               

                               

                              cd build

                               

                               

                              replace jars from your application server

                               

                               

                              Are you using standalone?

                               

                              if that's the case

                               

                               

                              cd build

                              git checkout Branch_2_2_AS7

                              ./build.sh distro

                               

                               

                               

                              If you are afraid of git:

                              downlaod it from here: https://github.com/hornetq/hornetq/zipball/Branch_2_2_AS7

                              And execute the commands above.

                              • 12. Re: QueueImpl memory leak
                                clebert.suconic

                                >> Our clients only create the subscription once

                                 

                                 

                                You will have one QueueImpl per client.