13 Replies Latest reply on May 3, 2011 2:23 PM by clebert.suconic

    Does Message Expiration work with Topics?

    bnc119

      Hi Folks,

       

      I am using HornetQ 2.1.2 Final on a RHEL 5 box.  I am trying to leverage the message expiration feature, so that "old" messages get purged by the expiration reaper thread.  I'm looking at the example code that ships with HornetQ:

       

      examples/jms/expiry/src/org/hornetq/jms/example/ExpiryExample.java

       

      and this feature is nicely demonstrated by a MessageProducer writing messages to a queue with a very short TTL.  My question is this:  Does message expiration work with topics

       

      I realize JMS topics implement a pub/sub model where you can have multiple subscribers for a message.  What I would like to see happen is for an expired message to be purged from the JMS topic, regardless of whether it's been delivered to all subscribers.  After all, if any particular subscriber "falls behind" in reading messages because it's busy doing other work, I don't want that topic's MessageCount backlog growing uncontrollably. 

       

      Thanks!

        • 1. Does Message Expiration work with Topics?
          clebert.suconic

          Yes, it should work

          • 2. Does Message Expiration work with Topics?
            bnc119

            Hi,

             

            It looks like my expiry-reaper-thread is not running.  I'm setting the TTL explicity for my MessageProducer, but it doesn't look like my messages ever get purged from the topic.  I'm poking through the hornetq-configuration.xml and the User Manual which states that I should be able to specify a scane period:

             

             

            A reaper thread will periodically inspect the queues to check if messages have expired.

            The reaper thread can be configured with the following properties in hornetq-configuration.xml

             

            However, it's not entirely clear where (i.e which xml block) in the configuration file this parameter belongs.  Any ideas?

            • 3. Does Message Expiration work with Topics?
              bnc119

              Is there a simple way to verify that the expiry-reaper thread is running?

              • 4. Does Message Expiration work with Topics?
                clebert.suconic

                TTL != Expiry Queue

                 

                 

                TTL = How long it will take to have connection failures to close the ServerSession

                 

                 

                Expiration is something different.

                • 5. Re: Does Message Expiration work with Topics?
                  bnc119

                  Hi,

                   

                  I'm confused by your last reply.  I'm looking at examples/jms/expiry/src/org/hornetq/jms/example/ExpiryExample.java, which ships with HornetQ 2.2.1 Final. In this example file, a MessageProducer is created, and a TimeToLive is specified for that producer.  My understanding is that messages sent by this producer will only be considered "valid" for the amount of time specified in milliseconds.

                   

                  An excerpt from ExpiryExample.java:

                   

                  // Create a JMS Message Producer

                  MessageProducer producer = session.createProducer(queue);

                   

                  // Messages sent by this producer will be retained for 1s (1000ms) before expiration

                  producer.setTimeToLive(1000);

                   

                  // Send Message to Queue.

                  producer.send(message);

                   

                  // Create Queue consumer and verify that the message has indeed "expired" from the queue.

                   

                   

                   

                  Furthermore, Secion 22.3 of the HornetQ User Manual States:

                   

                   

                   

                   

                   

                  I don't understand what you mean when you say that TimeToLive is not related to the Expiry Queue.  From the example source code and User Manual, it seems that they are in fact very related.  That is, messages that have been idle in a queue beyond their TimeToLive get purged from that queue and get moved to the Expiry Queue.  If this is not the case, then I would suggest that the user manual needs to explain this in more detail.

                   

                  In any case, I am basically trying to duplicate ExpiryExample.java, with the only difference being that I am writing messages to a topic, not a queue. I'm using jconsole to monitor the message traffic in my system, and even after setting the TimeToLive property on a message producer, I never see the messages dissapear from the "MessageCount" field on that topic.  Am I missing something?

                   

                  Thanks!

                  • 6. Re: Does Message Expiration work with Topics?
                    clebert.suconic

                    I thought for some reason that you were confusing the Connection TTL with the expiration.

                     

                     

                    Can you make an example showing your issue?

                    • 7. Re: Does Message Expiration work with Topics?
                      bnc119

                      I suppose the best way to explain what I'm doing is to discuss how I typically monitor message traffic on the topics in our system.  I don't know if you're familiar with jconsole, but it is a little test-stand utility that ships with java.  It allows you to attach to a running java process and (depending on what the vendor has exposed) assists in troubleshooting facets of that program.  HornetQ's support for jconsole is good - I use it frequently to monitor how many subscibers are actively connected to my JMS topics, as well as the number of messages backlogged on each topic.  A message will be backlogged until it has been delivered to all subscribers currently connected to that topic. (Note: all of our subscribers are non-durable.)

                       

                      Anyway, I open jconsole and attached to "org.hornetq.integration.bootstrap.HornetQBootstrapServer":

                       

                      $ jconsole &

                       

                      A little GUI pops up.  I click on the "MBeans" tab, and on the left-hand tree-structure, I expand:

                      org.hornetq-->Topic-->JMS--><TOPIC_NAME>-->Attributes

                       

                      From this screen, the statistic I am concerned with is called:

                       

                      MessageCount (This is the number of "backlogged" messages on that topic that are still awaiting delivery to one or more non-durable subscribers)

                       

                      Now, to describe the experiement I'm running:

                      ----------------------------------------------------------------------

                       

                      1.  I have one MessageProducer and one MessageConsumer.  The consumer subscribes to a topic but doesn't call receive().  This is meant to simulate a subscriber who is busy doing other "work" besides reading JMS messages.

                       

                      2.  The MessageProducer connects to the topic, sets TimeToLive(5000) on itself, then creates a text message and publishes the message to the topic. 

                       

                      3.  From jconsole, I can see that MessageCount = 1, and SubscriptionCount = 1.  As it should be.  The subscriber hasn't called receive() yet, so HornetQ is "backlogging" the message.

                       

                      4.  Now, since I set the TimeToLive on the MessageProducer object = 5 seconds, I would expect for HornetQ to purge that message out of the JMS topic after 5 seconds + ExpiryThreadScanPeriod.  From the user manual, I read that the default scan period = 30 seconds, so in the worst case, the message should be purged from the topic after 30 seconds.  Alas, it is not.  MessageCount stays = 1, until about 2 minutes later when the consumer finally wakes back up and disconnects from the topic.  Once HornetQ detects that there are no active non-durable subscribers left on the topic, all messages on that topic are finally purged out.

                       

                       

                      So my problem is that I can't seem to get the message expiration to work properly.  Either I'm not setting TTL correctly, (I don't how you can mess that up) or the Expiry Reaper thread is not running.  Do you have any suggestions on how I might troubleshoot this?  Is there some more fine-grained debug flag that I can enable to verify that the repear thread is indeed active?  Thanks for your help!

                      • 8. Re: Does Message Expiration work with Topics?
                        clebert.suconic
                        4.  Now, since I set the TimeToLive on the MessageProducer object = 5 seconds, I would expect for HornetQ to purge that message out of the JMS topic after 5 seconds + ExpiryThreadScanPeriod.  From the user manual, I read that the default scan period = 30 seconds, so in the worst case, the message should be purged from the topic after 30 seconds.  Alas, it is not.  MessageCount stays = 1, until about 2 minutes later when the consumer finally wakes back up and disconnects from the topic.  Once HornetQ detects that there are no active non-durable subscribers left on the topic, all messages on that topic are finally purged out.

                         

                        That's only true if you don't have any messages cached at the client.

                         

                         

                        The client will also check for the expiry of the message before returning to the client.

                         

                        If you want to have the expiry being treated at the server only, use consumerWindowSize = 0 (at the expense of a network round trip to the server every time you call .receive()).

                         

                         

                        I have added a test and it works fine and as expected.

                        • 9. Does Message Expiration work with Topics?
                          bnc119

                          Thanks for the extra insight.  There are clearly more issues out there for me to understand.  Are you familiar with jConsole?  If you read my last post I describe how I'm testing this system with jConsole.  Specifically, do you know what the MessageCount field under org.hornetq->Topic->JMS-><TOPIC NAME> represents?

                           

                          Does messageCount represent the number of undelivered messages on that particular topic ??

                           

                          I'm at the point where messages with short TTLs are indeed not getting delivered, but I want to understand what I'm seeing in jConsole better.

                          • 10. Does Message Expiration work with Topics?
                            clebert.suconic

                            MessageCount represents the number of messages you have the queue. DeliveringMessages represents the number of messages sent to the client (or client buffer) but not Acked yet.

                            • 11. Does Message Expiration work with Topics?
                              bnc119

                              Thanks for your reply.  Is it possible to have the Client "scan" for expired messages in the client buffer and send ACKs back to the server without requiring the messageConsumer to call receive()?  What I'm seeing in my experiements is that expired messages are only cleared/(ACK'd) from the server's queue once the messageConsumer calls receive().

                               

                              This unfortunately won't work for me because if my messageConsumer code is busy doing other work and never gets a chance to call receive(), the MessageCount queue on the server will continue to "fill up" with non-ACK'd messages.

                               

                              Is there some mechanism to manage the client buffer?  (i.e scan the buffer for expired messages and ACK those message back to the server?)

                               

                              Thanks,

                               

                               

                               

                               

                               

                               

                               

                               

                              Thanks,

                              • 12. Does Message Expiration work with Topics?
                                bnc119

                                Follow-up questions may also be:

                                 

                                1.  How long does hornetQ wait for a message to be ACK'd by a client?

                                2.  Is such an "ACK timeout" configurable?

                                3.  What action is take by hornetQ once that timeout is reached?

                                 

                                (Note:  Assume the client is conneced to hornetQ via an "AUTO_ACKNOWLEDGE" session)

                                 

                                Thanks,

                                • 13. Does Message Expiration work with Topics?
                                  clebert.suconic

                                  There's no such thing as ACK timeout.

                                   

                                  With AUTO_ACK, the message is ACKed as soon as you received the message.

                                   

                                   

                                  The message may be on a client buffer, but with Auto-ACK the message will be sent back as soon as you acked it.

                                   

                                   

                                  There's another option on DUPS_OK, that we batch ACKs accordingly to your Batch size.