6 Replies Latest reply on Sep 28, 2014 10:27 PM by Greg Kopff

    Pub/sub setup results in leak of ServerMessageImpl instances

    Greg Kopff Newbie

      I am trying to use HornetQ to perform publish/subscribe style messaging.  I am using HornetQ 2.4.1.Final.  I use the native HornetQ APIs, not JMS.

       

      I create an embedded HornetQ server with the following configuration:

       

          final Configuration configuration = new ConfigurationImpl();

          configuration.setPersistenceEnabled(false);

          configuration.setJournalDirectory(System.getProperty("java.io.tmpdir"));

          configuration.setCreateJournalDir(false);

          configuration.setSecurityEnabled(false);

          configuration.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName()));

          configuration.setClusterUser("unused");

          configuration.setClusterPassword("disabled");

       

          final HornetQServer server = HornetQServers.newHornetQServer(configuration);

          server.start();

       

       

      I create a publisher like so:

       

            final ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(

                new TransportConfiguration(NettyConnectorFactory.class.getName()));

            locator.setConfirmationWindowSize(5 * 1024 * 1024);

            locator.setReconnectAttempts(RETRY_FOREVER);

       

            final ClientSessionFactory factory = locator.createSessionFactory();

            this.session = factory.createSession();

            this.producer = this.session.createProducer("the_address");

       

      I compose and send messages like this:

       

          final ClientMessage msg = this.session.createMessage(false);

          msg.getBodyBuffer().writeString("the body");

          msg.putLongProperty("an_id", id);

       

          this.producer.send(msg);

       

      On the other end, I setup a subscriber like this:

       

            final ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(

                new TransportConfiguration(NettyConnectorFactory.class.getName()));

            locator.setConfirmationWindowSize(5 * 1024 * 1024);

            locator.setReconnectAttempts(RETRY_FOREVER);

       

            final ClientSessionFactory factory = locator.createSessionFactory();

            this.session = factory.createSession();

       

            final String queueName = UUID.randomUUID().toString();

            this.session.createQueue("the_address", queueName);

            this.consumer = this.session.createConsumer(queueName);

       

            this.consumer.setMessageHandler(this::receive);

       

      However, when I run this code and inject a lot of messages though it, I find an ever increasing reachable reference count for org.hornetq.core.server.impl.ServerMessageImpl, eventually resulting in an out of memory error.  The same applies to instances of .org.hornetq.core.server.impl.MessageReferenceImpl

       

      What have I misconfigured?  Is my approach incorrect?

        • 1. Re: Pub/sub setup results in leak of ServerMessageImpl instances
          Justin Bertram Master

          Are you actually acknowledging the messages that you receive?  Did you read the JavaDoc for org.hornetq.api.core.client.ClientSessionFactory#createSession()?  Here it is in case you didn't:

           

             /**

              * Creates a <em>non-transacted</em> session.

              * Message sends and acknowledgements are automatically committed by the session. <em>This does not

              * mean that messages are automatically acknowledged</em>, only that when messages are acknowledged,

              * the session will automatically commit the transaction containing the acknowledgements.

              *

              * @return a non-transacted ClientSession

              * @throws HornetQException if an exception occurs while creating the session

              */

             ClientSession createSession() throws HornetQException;

          • 2. Re: Pub/sub setup results in leak of ServerMessageImpl instances
            Greg Kopff Newbie

            Hi Justin.

             

            I'll tackle this one by one ...

            Are you actually acknowledging the messages that you receive?

            No I'm not.  I didn't realise I had to (more on this in a second).  Based on what I've done before with JMS, I would have expected that any acks would be automatically performed prior / just after the message handler callback was kicked.

            Did you read the JavaDoc for org.hornetq.api.core.client.ClientSessionFactory#createSession()?

            Well, I must have read it at least one before, but perhaps I didn't glean the required information from it.  Can you can help me to understand in what context 'transaction' is being used?  If it's in the JTA sense, then I'm not interested in anything like that.  And that will have made me skip over those (perhaps important) details.

             

            There's a a final "but" to my reply however ...

             

            I'm trying to create a pub/sub mechanism ... and the problem of the increasing references of ServerMessageImpl occurs whether or not I have any subscribers.  Stated another way: I can have zero subscribers and I still leak references.  My expectation is that if there is no subscribers, the messages just disappear off into the ether.  This is what would happen if I used the JMS API and had a topic.

             

            There is every indication that these messages are buffered awaiting an acknowledgement (as you say) - but this is not the mode of operation I'm trying to achieve.  Can you tell me how to configure the native API to mimic a JMS topic?  (Before discovering this leak, I thought I understood, hence I submitted this pull request to the documentation some time ago: Re-arrange the example code to show the intent in a clearer manner. by gkopff · Pull Request #1698 · hornetq/hornetq · G… ).

             

            Pointers appreciated.

            • 3. Re: Pub/sub setup results in leak of ServerMessageImpl instances
              Justin Bertram Master

              No I'm not.  I didn't realise I had to (more on this in a second).

              I believe this is your fundamental problem.

               

              Based on what I've done before with JMS, I would have expected that any acks would be automatically performed prior / just after the message handler callback was kicked.

               

              JMS and HornetQ Core are different in this respect.  When you create a JMS session you specify an acknowledgment mode (e.g. AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE, etc.).  HornetQ Core doesn't have this concept.  If you don't explicitly acknowledge the messages you receive then they won't be removed from the broker which means if you continue to send messages in your scenario then they will fill up all your memory and cause an OOME.

               

              The JavaDoc for org.hornetq.api.core.client.ClientSessionFactory#createSession() tries to make this clear by saying, "This does not mean that messages are automatically acknowledged..."

               

              Can you can help me to understand in what context 'transaction' is being used?  If it's in the JTA sense, then I'm not interested in anything like that.  And that will have made me skip over those (perhaps important) details.

              The transaction in question is not JTA.  It is a simple transaction used internally by HornetQ to track acknowledgements (among other things) in a single session.  It allows you to, for example, receive a batch of messages and acknowledge them all at once which is faster than acknowledging messages individually.

               

              I'm trying to create a pub/sub mechanism ... and the problem of the increasing references of ServerMessageImpl occurs whether or not I have any subscribers.  Stated another way: I can have zero subscribers and I still leak references.  My expectation is that if there is no subscribers, the messages just disappear off into the ether.  This is what would happen if I used the JMS API and had a topic.

              Your expectation is accurate.  If you send a message to an address with no queues then the message will have nowhere to go so it will just disappear.  Since you are retaining messages then I suspect you have a queue on your address.

               

              There is every indication that these messages are buffered awaiting an acknowledgement (as you say) - but this is not the mode of operation I'm trying to achieve.  Can you tell me how to configure the native API to mimic a JMS topic?

              Simply acknowledge the messages you receive using org.hornetq.api.core.client.ClientMessage#acknowledge() or perhaps just use the JMS API instead if that's the kind of behavior you're looking for.

              • 4. Re: Pub/sub setup results in leak of ServerMessageImpl instances
                Greg Kopff Newbie

                Hi Justin.

                 

                Thank you for your help.

                I believe this is your fundamental problem.

                Yes.

                JMS and HornetQ Core are different in this respect.  When you create a JMS session you specify an acknowledgment mode (e.g. AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE, etc.).  HornetQ Core doesn't have this concept.  If you don't explicitly acknowledge the messages you receive then they won't be removed from the broker which means if you continue to send messages in your scenario then they will fill up all your memory and cause an OOME.

                 

                The JavaDoc for org.hornetq.api.core.client.ClientSessionFactory#createSession() tries to make this clear by saying, "This does not mean that messages are automatically acknowledged..."

                Brilliant - thank you for this explanation.

                The transaction in question is not JTA.  It is a simple transaction used internally by HornetQ to track acknowledgements (among other things) in a single session.  It allows you to, for example, receive a batch of messages and acknowledge them all at once which is faster than acknowledging messages individually.

                Again, thank you.

                Your expectation is accurate.  If you send a message to an address with no queues then the message will have nowhere to go so it will just disappear.  Since you are retaining messages then I suspect you have a queue on your address.

                That's good to know.

                 

                The thing that had me stumped was the fact that I thought I saw the same behaviour when there were no subscribers.  On a new day with fresh outlook, I can see that with no subscribers the reference count increases up to a point, but then gets cleaned up.  In other words, the problem doesn't show up with no subscribers.

                 

                Furthermore, if my subscriber acknowledges the message when it's received, my problem disappears.  Your solution was right.

                 

                Thank you very much for your help.

                 

                Kindest regards,

                 

                --

                Greg.

                • 5. Re: Pub/sub setup results in leak of ServerMessageImpl instances
                  Justin Bertram Master

                  The thing that had me stumped was the fact that I thought I saw the same behaviour when there were no subscribers.  On a new day with fresh outlook, I can see that with no subscribers the reference count increases up to a point, but then gets cleaned up.  In other words, the problem doesn't show up with no subscribers.

                  I have a theory about why you may have seen this previously when you didn't expect.  Your code is calling org.hornetq.api.core.client.ClientSession#createQueue(java.lang.String, java.lang.String).  As the JavaDoc for this method states, it creates a non-temporary, non-durable queue.  This means that once the queue is created it will remain until the server is restarted.  That's different from a non-durable JMS topic subscriber whose subscription disappears when the subscriber is closed.  Perhaps you made your observation after you had closed your subscriber but had not restarted the server so that messages were accumulating in the non-temporary, non-durable queue that the consuming client created.

                  1 of 1 people found this helpful
                  • 6. Re: Pub/sub setup results in leak of ServerMessageImpl instances
                    Greg Kopff Newbie

                    Hi Justin.

                     

                    Well spotted - thank you!

                     

                    --

                    Greg.