10 Replies Latest reply on May 12, 2010 3:46 PM by Tim Fox

    Bug in consumer windowing?

    Bill Burke Master

      I've pre-created a bunch of consumers via a ClientSessionFactory set with windowing to 0

       

      sessionFactory.setConsumerWindowSize(0);

       

      The goal is to randomly pick a consumer, call receive() on it, to get the next message in the queue.  I was hoping that setConsumerWindowSize(0) would allow me to do this.  I tested it with the below code.

       

      What's interesting is that if I put a consumer.receiveImmediate() within the consumer creation loop, the assertion at the bottom fails.  Does receiveImmediate() put the consumer in a state to be able to receive messages?  I was hoping to keep a pool of consumers and use them as needed to receive messages.  A pool doesn't/can't work if the server pre-delivers to the consumer before it is picked form the pool

       

       

      @Test
         public void testConsumerWindow() throws Exception
         {
            ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
            sf.setConsumerWindowSize(0);//
      
            ClientSession session = sf.createSession(false, false, false);
            session.createQueue("testWindow", "testWindow", true);
            session.close();
      
      
            int numConsumers = 5;
      
            ArrayList<ClientConsumer> consumers = new ArrayList<ClientConsumer>();
            ArrayList<ClientSession> sessions = new ArrayList<ClientSession>();
            for (int i = 0; i < numConsumers; i++)
            {
               System.out.println("created: " + i);
               ClientSession session1 = sf.createSession();
               ClientConsumer consumer = session1.createConsumer("testWindow");
               consumers.add(consumer);
               session1.start();
               sessions.add(session1);
               // --- this is added later to cause error consumer.receiveImmediate();
      
            }
      
            ClientSession senderSession = sf.createSession();
      
            ClientProducer producer = senderSession.createProducer("testWindow");
      
            ClientMessage sent = senderSession.createMessage(false);
            sent.putStringProperty("hello", "world");
      
            producer.send(sent);
            senderSession.start();
      
            ClientConsumer consumer = consumers.get(2);
            ClientMessage received = consumer.receiveImmediate();
            Assert.assertNotNull(received);
         } 
      
        • 1. Re: Bug in consumer windowing?
          Tim Fox Master

          Your test fails because sending of non persistent messages is by default asynchronous, so by the time your call to receiveImmediate() is executed it is quite likely the message hasn't reached the queue yet so it correctly returns null.

          • 2. Re: Bug in consumer windowing?
            Clebert Suconic Master

            The test will fail even if you use persistent messages.

             

            The deliver to the queue will also happen asynchronously.. so there is no guarantee that receiveImmediate will have an element on queue when session.send() was finished.

             

             

            I have done a few changes on the test, and it was failing when using receiveImmediate (with persistent messages, over a transacted session), and it passed when I used receive(1000);

             

             

             

            You could add BillsTest.java anywhere on the HornetQ's testsuite, and run it through eclipse (or any IDE you like).

            • 3. Re: Bug in consumer windowing?
              Bill Burke Master

              The idea of the above test is to show that windowing does not work if a receive is done (unsuccessfully) prior to a send.  The receiveImmediate() in the consumer creation loop exists to show previously failed receives.  If I change the receiveImmediate() that is called after the send to receive(1000) then the test *still* fails.   (This is 2.0.0.GA)

               

              Again, am I misunderstanding consumer windowing?  It looks like that if a receive() is called on a consumer it is registered with the queue to take new messages as they come in.  Again, what i want is the ability to pool consumers and pick one randomly to do a pull receive.  This wouldn't work without windowing because of async caching.  I thought it might work if windowing was set to zero.

              • 4. Re: Bug in consumer windowing?
                Bill Burke Master

                Clebert, uncomment out

                 

                // --- this is added later to cause error consumer.receiveImmediate();

                 

                and replace it with

                 

                consumer.receiveImmediate();

                 

                and you'll see the problem.

                • 5. Re: Bug in consumer windowing?
                  Clebert Suconic Master

                  I see what you mean...

                   

                  https://jira.jboss.org/jira/browse/HORNETQ-385

                   

                   

                  If you call consumer.receive(X) or consumer.receiveImmediate() and receive null, the client will still hold a message on the buffer.It's like you opened a door to recieve a message but didn't close it when it returned null.

                  • 6. Re: Bug in consumer windowing?
                    Bill Burke Master

                    Its even worse.  Check this out.  I think there is something weird going on with receiveImmediate()

                     

                    @Test
                       public void testReceive() throws Exception
                       {
                          ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(InVMConnectorFactory.class.getName()));
                          sf.setConsumerWindowSize(0);
                    
                          ClientSession session = sf.createSession(false, false, false);
                          session.createQueue("testReceive", "testReceive", true);
                          session.close();
                    
                          send(sf, 1);
                    
                          
                          ClientSession session1 = sf.createSession();
                          ClientConsumer consumer = session1.createConsumer("testReceive");
                          session1.start();
                    
                          Thread.sleep(100);
                          ClientMessage message = null;
                          message = consumer.receiveImmediate();
                          Assert.assertNotNull(message);
                          System.out.println(message.getStringProperty("hello"));
                          message.acknowledge();
                    
                          send(sf, 2);
                    
                          message = consumer.receive(1000);
                          Assert.assertNotNull(message);
                          System.out.println(message.getStringProperty("hello"));
                          message.acknowledge();
                       }
                    
                    • 7. Re: Bug in consumer windowing?
                      Clebert Suconic Master

                      Right,

                       

                       

                      I've added your tests into ConsumerWndowSizeTest (in disabled form ATM).

                       

                      I will investigate this as part of the JIRA.

                       

                      thanks Bill.

                      • 8. Re: Bug in consumer windowing?
                        Tim Fox Master

                        The problem is actually not with receiveImmediate() it's in the way zero consumer window size is implemented.

                         

                        I've added a test that demonstates the same issue but just using normal receive(...).

                         

                        What's happening here is before the receive() a single credit is sent to the server. Problem is, if a message is not available that credit remains on the server consumer and gets used up the next time a message is available.

                         

                        What should happen is the credit should be removed immediately if a message is not immediately available in the case of receiveImmediate(), or removed on timeout in the case of receive(...).

                         

                        As an aside... receiveImmediate() and setting consumer window size to zero will kill performance, as you'll effectively be doing a network round trip for each message.

                        • 9. Re: Bug in consumer windowing?
                          Bill Burke Master

                          It won't kill performance for what I'm doing as the server/client are colocated, unless there's some in-process baggage I have to worry about too?  But what about within a cluster?  How are messages distributed in a cluster with window = 0?

                          • 10. Re: Bug in consumer windowing?
                            Tim Fox Master

                            It'll still have a significant perf drag even when colocated due to all the extra context switching involved.

                             

                            Regarding clustering, consumer window size doesn't affect the behaviour of message distribution. Messages will still be distributed as defined in the cluster-connection configuration.