5 Replies Latest reply on Mar 25, 2009 5:37 AM by ataylor

    Semantic on Session.stop and MessageHandler

    clebert.suconic

      I am writing this following test on ClientConsumerTest.

      If I just call session.stop, the MessageHandler keeps being called until all the messages on the clientBuffer are consumed, but if I call consumer.setHandler(null) I would have the behaviour I expected.


      So.. what's the expected semantic from the user's perspective? Shouldn't session.stop prevent any messages being delivered to the handler after stop is called?

       public void testStopConsumer() throws Exception
       {
       ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
      
       final ClientSession session = sf.createSession(false, true, true);
      
       session.createQueue(QUEUE, QUEUE, null, false, false);
      
       ClientProducer producer = session.createProducer(QUEUE);
      
       final int numMessages = 100;
      
       for (int i = 0; i < numMessages; i++)
       {
       ClientMessage message = createMessage(session, "m" + i);
       producer.send(message);
       }
      
       final ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
      
       session.start();
      
       final CountDownLatch latch = new CountDownLatch(10);
      
       // Message should be in consumer
      
       class MyHandler implements MessageHandler
       {
       boolean failed;
       boolean started = true;
      
       public void onMessage(final ClientMessage message)
       {
      
       try
       {
       if (!started)
       {
       failed = true;
       }
      
       latch.countDown();
      
       if (latch.getCount() == 0)
       {
       started = false;
       session.stop();
       consumer.setMessageHandler(null); // the test will fail if we comment out this line
       }
      
       message.acknowledge();
       }
       catch (Exception e)
       {
       }
       }
       }
      
       MyHandler handler = new MyHandler();
      
       consumer.setMessageHandler(handler);
      
       latch.await();
      
       Thread.sleep(100);
      
       assertFalse(handler.failed);
      
       // Make sure no exceptions were thrown from onMessage
       assertNull(consumer.getLastException());
      
       for (int i = 0; i < 90; i++)
       {
       ClientMessage msg = consumer.receive(1000);
       assertNotNull(msg);
       msg.acknowledge();
       }
      
       assertNull(consumer.receiveImmediate());
      
       session.close();
       }
      
      


        • 1. Re: Semantic on Session.stop and MessageHandler
          timfox

          Yes, once session.stop() has returned it should be guaranteed no more messages will be received on a handler until it's started again.

          However we shouldn't wait for all messages in the buffer to be processed - this might take a long time. We should only wait for the currently executing one to complete.

          The same behaviour should be on close() and setMessageHandler(null).

          • 2. Re: Semantic on Session.stop and MessageHandler
            timfox

            It should have same semantics as JMS connection.stop.

            This is explained in the JMS javadoc:


            Temporarily stops a connection's delivery of incoming messages. Delivery can be restarted using the connection's start method. When the connection is stopped, delivery to all the connection's message consumers is inhibited: synchronous receives block, and messages are not delivered to message listeners. This call blocks until receives and/or message listeners in progress have completed. Stopping a connection has no effect on its ability to send messages. A call to stop on a connection that has already been stopped is ignored. A call to stop must not return until delivery of messages has paused. This means that a client can rely on the fact that none of its message listeners will be called and that all threads of control waiting for receive calls to return will not return with a message until the connection is restarted. The receive timers for a stopped connection continue to advance, so receives may time out while the connection is stopped. If message listeners are running when stop is invoked, the stop call must wait until all of them have returned before it may return. While these message listeners are completing, they must have the full services of the connection available to them.


            • 3. Re: Semantic on Session.stop and MessageHandler
              timfox

              To implement stop() on the clientconsumer, just need to wait until the currently executing runnable is complete, then cancelling all the other queued runnable (there'll be one queued runnable per message).

              Then, on start() just recreate the runnables and execute() them.

              Also need to make sure receive() doesn't return non null called when consumer is stopped.

              Same behaviour should exist when close()'ing and when setting messagehandler to null.

              The current waitForOnMessageToComplete method waits for *all* messages in buffer to complete, which is not necessary.

              • 4. Re: Semantic on Session.stop and MessageHandler
                timfox

                A couple of issues with the latest commit:

                1) The logic in setMessageHandler() doesn't look right:

                When setting a non null message handler, then it remains stopped so any messages won't get delivered.

                Messages are being requeued if the previous handler was null but even if the current handler is null

                2) receive() method:

                while ((stopped || (m = buffer.poll()) == null) && !closed && toWait > 0)

                It will stay in the loop as long as the consumer is stopped. This is not correct - receive should always return after timeout, it's just that if it's stopped then it should always return null.

                Can you make sure the above are captured in tests?

                • 5. Re: Semantic on Session.stop and MessageHandler
                  ataylor

                   

                  A couple of issues with the latest commit:

                  1) The logic in setMessageHandler() doesn't look right:

                  When setting a non null message handler, then it remains stopped so any messages won't get delivered.

                  Messages are being requeued if the previous handler was null but even if the current handler is null


                  correct, I'll add a test and fix

                  2) receive() method:

                  while ((stopped || (m = buffer.poll()) == null) && !closed && toWait > 0)

                  It will stay in the loop as long as the consumer is stopped. This is not correct - receive should always return after timeout, it's just that if it's stopped then it should always return null.


                  are you sure? ClientConsumerTest:testStopStartConsumerSyncReceive() tests for this.