11 Replies Latest reply on Apr 15, 2014 10:22 PM by jbertram

    Lower than expected concurrency with message grouping

    brettniven

      Hi,

       

      I'm experiencing lower than expected concurrency in a scenario that uses message grouping. I wondering whether I'm using one or more anti-patterns or whether this may be a JBoss/HornetQ integration issue. My scenario:

      • JBoss 7.1.3.Final, HornetQ 2.2.21.Final
      • Many producers send messages of varying sizes (let's say an average message size of 100KB)
      • Messages are grouped by setting a JMSXGroupID
      • Most messages are written to the filesystem's large messages dir (by HornetQ), as expected
      • I am using MDB's as message consumers. These are 'slow consumers'. Default MDB instance size of 15.
      • When there are >= 15 unique JMSXGroupID's in the queue, I expect concurrency of 15 (I expect 15 active MDB's)

       

      Problem:

      • When 'x' (let's say 100) messages with the same JMSXGroupID are at the top of the queue, I end up with a concurrency of 1 (a single active MDB), even if there are many more items in the queue with differing JMSXGroupID's
      • It appears that only the first 'x' messages are 'considered for allocation' to consumers/MDB's
      • When debugging and checking queue properties, I found that the JMSQueueControl.getDeliveringCount() returned a small figure (< 100), inferring that only that figure had been 'allocated to a consumer' (and all to the same consumer as they have the same message group)

       

      How I've fixed it temporarily:

      • I realised that it was not 'x messages' that were considered for allocation, but instead 'x bytes'. Proven by mocking a test with much smaller message sizes
      • So, after reading up on Flow Control, I've increased consumer-window-size (100MB resolves my scenario)
      • After doing so and re-running my test, I have an expected concurrency of 15 and JMSQueueControl.getDeliveringCount() returns the length of the queue - inferring that all messages have been 'allocated to consumers'

       

      But this is undesirable for several reasons:

      • The tests and tmp solution infer that before messages are allocated to a consumer (in my case, MDB's), the full message is read into memory.
      • I had hoped that only message header properties would be read before allocating to a consumer and that the consumer would then only read the message into memory immediately prior to processing
      • I ideally only want 15 full messages read into memory at any one time. In my example/test, I don't want the first 100 messages loaded into memory unnecessarily until immediately before they will be processed
      • These are slow consumers so I would ideally like to set consumer-window-size to 0, meaning no buffer for each consumer

       

      Questions:

      • In a JBoss env, is each MDB considered a JMS consumer, or is there perhaps a single consumer that then delegates to the MDB's?
        • i.e. When setting consumer-window-size to 100MB, am I allowing 100MB per MDB (of which there may be 15, so 1.5GB total) or 100MB per jvm?
        • My tests suggest that there is a single consumer process somewhere. If all MDB's were considered JMS consumers, then the problem wouldn't have surfaced - as all 15 MDB's would have used the default consumer-window-size of 1MB and would have each had a message to process, even if the messages were way down the queue
      • Is it true that the full message (including message body) is read into memory before allocating to consumers? This appears to be the case.

       

      Am I using anti-patterns?

      • Should I perhaps write message bodies to disk myself and keep the jms message body tiny - such as a mere path to the large message content? I had hoped to not have to do this seeing as HornetQ already has logic to write/read to/from the filesystem
      • Should I perhaps not use MDB's and implement my own consumers?

       

      Any tips/pointers/feedback much appreciated.

       

      Regards,

       

      Brett

        • 1. Re: Lower than expected concurrency with message grouping
          ataylor

          firstly let me answer your questions.

           

          In a JBoss env, is each MDB considered a JMS consumer, or is there perhaps a single consumer that then delegates to the MDB's?

          neither, there are n sessions/consumers that feed the MDB;s in the pool as they are available. by default this is 15 and configured via the maxSession activation property on the MDB (or on the RA).

          i.e. When setting consumer-window-size to 100MB, am I allowing 100MB per MDB (of which there may be 15, so 1.5GB total) or 100MB per jvm?

          per consumer

          My tests suggest that there is a single consumer process somewhere. If all MDB's were considered JMS consumers, then the problem wouldn't have surfaced - as all 15 MDB's would have used the default consumer-window-size of 1MB and would have each had a message to process, even if the messages were way down the queue

          as above

          Is it true that the full message (including message body) is read into memory before allocating to consumers? This appears to be the case.

          yes

           

          and no, you aren't using any anti patterns.

           

          firstly you are using quite an old version, i think its 2 years old now and we have fixed quite a few bugs, is there any chance you could test against a newer version, maybe Wildfly 8?

           

          if you could easily provide me with a test or example i can easily run i would happily investigate for you?

          1 of 1 people found this helpful
          • 2. Re: Lower than expected concurrency with message grouping
            jbertram

            When 'x' (let's say 100) messages with the same JMSXGroupID are at the top of the queue, I end up with a concurrency of 1 (a single active MDB), even if there are many more items in the queue with differing JMSXGroupID's

            If I'm not mistaken, I believe the behavior you're seeing is expected. 

             

            There's 2 important points here:

            • a queue has FIFO semantics
            • message grouping serializes the processing of messages with the same group ID

             

            These two facts together you may see behavior like you've described.  Just because a chunk messages at the head of the queue have the same group ID doesn't mean that other consumers can then jump around that chunk and start consuming the messages behind it.  That would break FIFO semantics.  At the very least, the chunk of grouped messages must be delivered to the appropriate client in order for other messages to be consumed (which is what you're seeing when you increase the consumer-window-size).

             

            I don't think it's valid to use message grouping and expect the same kind of concurrent message processing you get when you don't group messages.

            • 3. Re: Lower than expected concurrency with message grouping
              ataylor

              Justin is correct, i missed your comment about having the same group id for messages.

              • 4. Re: Lower than expected concurrency with message grouping
                brettniven

                OK, I'm understanding it much better now. Thank you for the replies.

                 

                It appears that when using message grouping, a strict FIFO queue is not the most efficient approach, especially with large messages and slow consumers.

                Of course I appreciate that HornetQ is used for many different scenarios and my case may be somewhat unique.

                In case there is a possibility for an enhancement request here for future versions, I have written a small test and reproduced my scenario in wildfly-8.1.0.CR1.

                A deployable ear, src and README with instructions for running are all within a zip that I'll attach.

                 

                I effectively want a 'look ahead' approach. Instead of setting a per-consumer 'consumer-window-size', I instead want to control the depth of the queue that is checked before allocating to consumers, as opposed to a strict FIFO.

                Either way, FIFO principles are broken when message grouping is used - if not when popped from the actual queue, then certainly when they are actually processed by a consumer (if earlier messages with a different group id are held up).

                As it stands, if I allocate 100MB to consumer-window-size, this means that if my consumers are very slow (which they often are in production), I could end up with 1.5GB memory usage total in the consumer buffers.

                If instead I could set a singular figure such as a 'queue-check-window-size', I could set my consumer-window-size to '0' - which is the recommendation for slow consumers (meaning no buffer).

                Or even better - would be a 'queue-check-depth' which wouldn't even have to read the full messages into memory until it knew whether there was an available consumer for the message based on the message group header.

                The end result is the same - I'd simply by saving a great deal on memory.

                If you think there may be room for an enhancement request here, I can add this to the HornetQ JIRA along with my test.

                 

                For now though, I appreciate the replies and I can now think of workarounds to resolve my problem, so thank you.

                 

                Before I sign off though, to make sure I'm 100% following this, I'm listing my understanding below. If any of this sounds incorrect, then please feel free to let me know:

                • The process that allocates messages to consumers only ever checks/pops the top element of the queue
                • On first check/pop, it allocates to consumer1.
                • Given that the first 100 messages in my test are with the same group, it will continue popping and adding to the consumer1 queue until the consumer1 buffer is full, as configured in consumerWindowSize
                • After the 'xth' pop (11/12 in my test), it can't allocate to consumer1 as it's buffer is full
                • It is a strict FIFO queue so message-to-consumer allocation will now halt until there is space in consumer1's buffer

                And when I increase consumer-window-size to something high, such as 100MB, and re-start the test, this resolves my issue because:

                • Popping the first 100 messages that belong to group1 results in < 100 MB in consumer1's buffer
                • So now, the process that allocates to consumers is now able to consider messages 100 through to 119.

                 

                Regards,

                 

                Brett

                • 5. Re: Lower than expected concurrency with message grouping
                  ataylor

                  firsly all you have said is correct except:

                   

                  Either way, FIFO principles are broken when message grouping is used - if not when popped from the actual queue, then certainly when they are actually processed by a consumer (if earlier messages with a different group id are held up).

                  Its actually the opposite, ordering guarantees are between the producer and consumer, i.e. point to point,  if you have 2 consumers then its possible that messages from a producer will be consumed by different consumers out of order, this is exactly what message grouping is for.

                   

                  regarding your feature request its definitely something that would be useful so if you could raise a JIRA, reference this post then it may be something we could add. saying that we do have a pretty long list of things we need to do so i doubt when we would have time in the short term to do it. We are also happy to receive contributions from the community tho so feel free to implement it.

                  • 6. Re: Lower than expected concurrency with message grouping
                    jbertram

                    Either way, FIFO principles are broken when message grouping is used - if not when popped from the actual queue, then certainly when they are actually processed by a consumer (if earlier messages with a different group id are held up).

                    I don't understand this statement.  As far as I know, FIFO semantics are never broken in any case.  The unexpected concurrency you're seeing is specifically to enforce FIFO semantics.  Can you clarify your statement here?

                     

                    As Andy noted, everything in your bullet points is accurate.

                    • 7. Re: Lower than expected concurrency with message grouping
                      jbertram

                      FYI - I've attempted to clarify this point in the HornetQ User Guide.  See https://github.com/hornetq/hornetq/commit/52f1af23ac93a9fa6f3afee0a0e0957e9c396a14.

                      • 8. Re: Lower than expected concurrency with message grouping
                        brettniven

                        In that statement, I was referring to the wider picture, not just the primary queue -  so maybe I shouldn't have referred to FIFO principles. But i'll elaborate...

                        Sure, the queue, before/when allocating to consumers, is FIFO - and no principles broken there.

                        But on a wider picture, messages can be processed (by their consumers/mdb's) earlier than messages that were placed in the queue before them (of course, this happens _after_ the FIFO primary queue)

                        i.e. In my example after increasing consumer-window-size, messages 2 to 100 would be waiting in the consumer1 buffer/queue after having been allocated

                        Message 101 arrived later in the main queue but is going to actually run (on another consumer) earlier than messages 2 to 100.

                        So in my particular scenario, looking at the wider picture, a different queue implementation other than a FIFO might be suitable.

                        i.e. Given that I am using message grouping, it matters not to me that messages 2 to 100 are popped from the primary queue first. I'd actually prefer if they were left in the primary queue, hence not using memory in consumer buffers.

                        Message 101 (with differing group id), could then be removed from the queue ahead of messages before it that we know are bound to an already-full consumer

                        • 9. Re: Lower than expected concurrency with message grouping
                          jbertram

                          But on a wider picture, messages can be processed (by their consumers/mdb's) earlier than messages that were placed in the queue before them (of course, this happens _after_ the FIFO primary queue)

                          Yes, of course.  If this wasn't possible then all message consumption would effectively be serialized even if multiple consumers were active because the server would only be able to hand out 1 message at a time and wait for that message to be acknowledged before it handed out another.

                           

                          As far as your specific use-case goes you'll have to figure out a suitable way to work around it.  Some ideas that come to mind:

                          • Reduce the number of messages in a group to mitigate the risk of a bottleneck.
                          • Separate the groups onto different queues so one group won't block another.  Diverts could help here
                          • Increase the performance of your consumers so large chunks of grouped messages don't cause bottlenecks.

                           

                          I don't know if this fits your use-case, but in the past people have used message groups to keep related chunks of information together (e.g. chunks of a single file that can be assembled after all the chunks are received).  In that kind of scenario HornetQ large message support can serve as a functional replacement.

                          • 10. Re: Lower than expected concurrency with message grouping
                            brettniven

                            Thanks. Yes, we have a few ideas in mind. Increasing consumer-window-size and toggling the message grouping will go a long way to resolving our issue. For reference, our problem occurs primarily when the server has been down (such as for patching). There are many remote agents (in the hundreds in some installations). When the server re-starts, all remote agents connect and send all data (via a REST API) that has been collected since the server was down. At the moment, whatever agent connects first, 'wins' , taking the top 100 or whatever queue spots. We can throttle the agents - but the message grouping isn't a one-to-one mapping to the sending agent - so we'll need smarts to check what is in the queue (and also what is in consumer buffers), to intelligently throttle. Multiple queues is also a possibility. All in all, the answers have helped in this post as I had to understand it before designing a solution. Thanks again.

                            • 11. Re: Lower than expected concurrency with message grouping
                              jbertram

                              Always glad to help.  Good luck with your implementation!