7 Replies Latest reply on Oct 28, 2009 9:40 AM by mural74

    Priority when delivering messages

    mural74

      Hi,

      I am getting familiar with the code and looking to the core classes of HQ. I noticed the PriorityLinkedListIterator does not take into consideration the priority when selecting the next() item after it has been instantiated. When dealing with slow consumers, this may produce situations where High priority messages get delivered only after sending all the lower-priority ones.
      Am I missing something? I am still trying to figure it out how the QueueImpl works, so I may be completely lost :S

      I can fix it, if it is actually a bug.

      Best,
      Diego

        • 1. Re: Priority when delivering messages
          timfox

          Can you create a test case that demonstrates the problem?

          • 2. Re: Priority when delivering messages
            mural74

            Sure, I will write one and and I will post it here.

            • 3. Re: Priority when delivering messages
              timfox

              Ok sounds good :)

              • 4. Re: Priority when delivering messages
                mural74

                Here is a test case that shows the behavior I mentioned.

                I am using the deliverNow method of the QueueImpl class and a Filter in the consumer in order to be able to trigger the issue. Basically, once the iterator is initialized (on the first call to hasNextl), it is not renewed when new messages are added to the queue. Therefore, the sequence produced by the iterator will not take into consideration higher-priority messages added to the queue.

                In fact, it is not a bug of the iterator, the queue should drop all in-use iterators if a new message is added to it. Some optimizations may be done in order to avoid dropping iterators whose filter is unrelated to the added message.

                I am pasting the test case here, I don't see any option to attach a file.


                import java.util.concurrent.Executors;
                import java.util.concurrent.ScheduledExecutorService;
                
                import org.hornetq.core.filter.Filter;
                import org.hornetq.core.server.HandleStatus;
                import org.hornetq.core.server.MessageReference;
                import org.hornetq.core.server.Queue;
                import org.hornetq.core.server.ServerMessage;
                import org.hornetq.core.server.impl.QueueImpl;
                import org.hornetq.tests.unit.core.server.impl.fakes.FakeConsumer;
                import org.hornetq.tests.util.UnitTestCase;
                import org.hornetq.utils.SimpleString;
                
                
                public class QueueImplPriorityTest extends UnitTestCase
                {
                 // The tests ----------------------------------------------------------------
                
                 private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
                
                 private static final SimpleString queue1 = new SimpleString("queue1");
                
                 private static final SimpleString address1 = new SimpleString("address1");
                
                 class FakeFilter implements Filter
                 {
                 public SimpleString getFilterString()
                 {
                 return null;
                 }
                
                 public boolean match(final ServerMessage message)
                 {
                 return true;
                 }
                
                 }
                
                 public void testPriority() throws Exception
                 {
                 FakeConsumer cons1 = new FakeConsumer(new FakeFilter());
                
                 Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
                
                 queue.addConsumer(cons1);
                
                 cons1.setStatusImmediate(HandleStatus.HANDLED);
                 MessageReference ref = generateReference(queue, 0);
                 ref.getMessage().setPriority((byte)2);
                 queue.addLast(ref);
                
                 ref = generateReference(queue, 1);
                 ref.getMessage().setPriority((byte)2);
                 queue.addLast(ref);
                
                 cons1.setStatusImmediate(HandleStatus.BUSY);
                 ref = generateReference(queue, 3);
                 ref.getMessage().setPriority((byte)2);
                 queue.addLast(ref);
                
                 ref = generateReference(queue, 4);
                 ref.getMessage().setPriority((byte)2);
                 queue.addLast(ref);
                
                 //This will initiate the iterator
                 queue.deliverNow();
                
                 cons1.setStatusImmediate(HandleStatus.HANDLED);
                 ref = generateReference(queue, 2);
                 ref.getMessage().setPriority((byte)4);
                 queue.addLast(ref);
                
                 ref = generateReference(queue, 5);
                 ref.getMessage().setPriority((byte)2);
                 queue.addLast(ref);
                 //Since the iterator is already initiated and there are more messages with lower priority
                 //It will deliver low priority messages first
                 queue.deliverNow();
                
                 System.out.println(cons1.getReferences());
                 for (int i = 0; i <= 5; i++) {
                 assertEquals(cons1.getReferences().get(i).getMessage().getMessageID(), i);
                 }
                 }
                }
                


                Best,
                Diego

                • 5. Re: Priority when delivering messages
                  timfox

                  Ok, good catch Diego.

                  Can you add a JIRA bug report for this? (You can attach the test case to the JIRA)

                  • 6. Re: Priority when delivering messages
                    timfox

                    I agree, the simplest way to fix this would be to drop all iterators when any new message arrives.

                    However, there are performance implications with that. A lightly better optimisation would be to only drop the iterators if a new message arrives with a higher priority than the current highest priority message in the queue.

                    In most cases, probably only the default priority is used and we don't need to drop all iterators in that case.

                    An even better optimisation would be to make the iterator smarter, and allow it to return recently added messages of a higher priority.

                    • 7. Re: Priority when delivering messages
                      mural74

                      I will add it and propose a solution for it if you like. I have not thought too much about how to solve it and I need to get a better picture of the functionality provided by the queue and how every part of interact with each other. What you are saying sounds like a good solution, I will try to find a way to avoid adding overhead to the process of adding a message, I guess it would be better to detect the change at the time of asking for the next() item. But this is just off the top of my head.