3 Replies Latest reply on Apr 22, 2010 10:24 AM by clebert.suconic

    Should a consumer keep receiving messages when the transaction is marked as rolled back

    clebert.suconic

      Another question I have in regard to the AsynchronousFailoverTest::testTransactional

       

       

       

      - Say you send 100 messages, msg1, 2...  100.

       

      - Now you are consuming these 100 messages and you should commit it at the end.

       

      - Now say a failure happened at message #50.

       

       

       

      What will happen at this case is the consumer buffer will be cleared and the consumer will receive message 1... 50 again.

       

      Later it's all good, as a call to commit will throw RolledBackException.

       

       

       

      Should the consumer still receiving messages even the transaction will be rolled back later?

       

      This test highlights in Java what I'm saying here. Add this test to FailoverTest.

       

       

      This is actually the root cause of the original failure we were having on AsynchronousFailoverTest before. Ther other stuff is what I found after changing the test.

       

      The consumer was being reset but the test was still validating message counters while receiving the message.

       

       

       

       

         public void testConsumeTransacted() throws Exception
         {
            ClientSessionFactoryInternal sf = getSessionFactory();
            sf.setBlockOnNonDurableSend(true);
            sf.setBlockOnDurableSend(true);
       
            ClientSession session = sf.createSession(false, false);
       
            session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
       
            final CountDownLatch latch = new CountDownLatch(1);
            class MyListener extends BaseListener
            {
               public void connectionFailed(final HornetQException me)
               {
                  latch.countDown();
               }
            }
       
            session.addFailureListener(new MyListener());
            ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
       
            final int numMessages = 100;
       
            for (int i = 0; i < numMessages; i++)
            {
               ClientMessage message = session.createMessage(true);
               setBody(i, message);
               message.putIntProperty("counter", i);
               producer.send(message);
            }
       
            session.commit();
            ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
       
            session.start();
            for (int i = 0 ; i < numMessages; i++)
            {
               ClientMessage message = consumer.receive(1000);
               message.acknowledge();
       
               // remove this and the test will pass
               assertEquals(i, (int)message.getIntProperty("counter"));
               if (i == 50)
               {
                  fail(session, latch);
               }
            }
            boolean exception = false;
            try
            {
               session.commit();
            }
            catch (HornetQException e)
            {
               exception = true;
            }
            assertTrue("Exception was expected!", exception);
            session.close();
            Assert.assertEquals(0, sf.numSessions());
            Assert.assertEquals(0, sf.numConnections());
         }