-
1. Re: Priority when delivering messages
timfox Oct 27, 2009 3:48 AM (in response to mural74)Can you create a test case that demonstrates the problem?
-
2. Re: Priority when delivering messages
mural74 Oct 27, 2009 9:31 AM (in response to mural74)Sure, I will write one and and I will post it here.
-
3. Re: Priority when delivering messages
timfox Oct 27, 2009 1:42 PM (in response to mural74)Ok sounds good :)
-
4. Re: Priority when delivering messages
mural74 Oct 28, 2009 1:51 AM (in response to mural74)Here is a test case that shows the behavior I mentioned.
I am using the deliverNow method of the QueueImpl class and a Filter in the consumer in order to be able to trigger the issue. Basically, once the iterator is initialized (on the first call to hasNextl), it is not renewed when new messages are added to the queue. Therefore, the sequence produced by the iterator will not take into consideration higher-priority messages added to the queue.
In fact, it is not a bug of the iterator, the queue should drop all in-use iterators if a new message is added to it. Some optimizations may be done in order to avoid dropping iterators whose filter is unrelated to the added message.
I am pasting the test case here, I don't see any option to attach a file.import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import org.hornetq.core.filter.Filter; import org.hornetq.core.server.HandleStatus; import org.hornetq.core.server.MessageReference; import org.hornetq.core.server.Queue; import org.hornetq.core.server.ServerMessage; import org.hornetq.core.server.impl.QueueImpl; import org.hornetq.tests.unit.core.server.impl.fakes.FakeConsumer; import org.hornetq.tests.util.UnitTestCase; import org.hornetq.utils.SimpleString; public class QueueImplPriorityTest extends UnitTestCase { // The tests ---------------------------------------------------------------- private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); private static final SimpleString queue1 = new SimpleString("queue1"); private static final SimpleString address1 = new SimpleString("address1"); class FakeFilter implements Filter { public SimpleString getFilterString() { return null; } public boolean match(final ServerMessage message) { return true; } } public void testPriority() throws Exception { FakeConsumer cons1 = new FakeConsumer(new FakeFilter()); Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null); queue.addConsumer(cons1); cons1.setStatusImmediate(HandleStatus.HANDLED); MessageReference ref = generateReference(queue, 0); ref.getMessage().setPriority((byte)2); queue.addLast(ref); ref = generateReference(queue, 1); ref.getMessage().setPriority((byte)2); queue.addLast(ref); cons1.setStatusImmediate(HandleStatus.BUSY); ref = generateReference(queue, 3); ref.getMessage().setPriority((byte)2); queue.addLast(ref); ref = generateReference(queue, 4); ref.getMessage().setPriority((byte)2); queue.addLast(ref); //This will initiate the iterator queue.deliverNow(); cons1.setStatusImmediate(HandleStatus.HANDLED); ref = generateReference(queue, 2); ref.getMessage().setPriority((byte)4); queue.addLast(ref); ref = generateReference(queue, 5); ref.getMessage().setPriority((byte)2); queue.addLast(ref); //Since the iterator is already initiated and there are more messages with lower priority //It will deliver low priority messages first queue.deliverNow(); System.out.println(cons1.getReferences()); for (int i = 0; i <= 5; i++) { assertEquals(cons1.getReferences().get(i).getMessage().getMessageID(), i); } } }
Best,
Diego -
5. Re: Priority when delivering messages
timfox Oct 28, 2009 4:09 AM (in response to mural74)Ok, good catch Diego.
Can you add a JIRA bug report for this? (You can attach the test case to the JIRA) -
6. Re: Priority when delivering messages
timfox Oct 28, 2009 4:16 AM (in response to mural74)I agree, the simplest way to fix this would be to drop all iterators when any new message arrives.
However, there are performance implications with that. A lightly better optimisation would be to only drop the iterators if a new message arrives with a higher priority than the current highest priority message in the queue.
In most cases, probably only the default priority is used and we don't need to drop all iterators in that case.
An even better optimisation would be to make the iterator smarter, and allow it to return recently added messages of a higher priority. -
7. Re: Priority when delivering messages
mural74 Oct 28, 2009 9:40 AM (in response to mural74)I will add it and propose a solution for it if you like. I have not thought too much about how to solve it and I need to get a better picture of the functionality provided by the queue and how every part of interact with each other. What you are saying sounds like a good solution, I will try to find a way to avoid adding overhead to the process of adding a message, I guess it would be better to detect the change at the time of asking for the next() item. But this is just off the top of my head.