3 Replies Latest reply on Jan 12, 2010 9:36 AM by Tim Fox

    HORNETQ-264 receiveImmediate and failover issue

    Clebert Suconic Master

      This is easy to replicate with a new test on MultiThreadReplicationTestBase

       

       

         public void testO() throws Exception
         {
            runTestMultipleThreads(new RunnableT()
            {
               @Override
               public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
               {
                  doTestO(sf, threadNum);
               }
            }, NUM_THREADS, false);
         }

       

       

       

         protected void doTestO(final ClientSessionFactory sf, final int threadNum) throws Exception
         {
            ClientSession sessCreate = sf.createSession(false, true, true);

       

            sessCreate.createQueue(MultiThreadRandomReattachTestBase.ADDRESS,
                                   new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()),
                                   null,
                                   false);

       

            ClientSession sess = sf.createSession(false, true, true);

       

            sess.start();

       

            ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));

       

            for (int i = 0 ; i < 100; i++)
            {
               assertNull(consumer.receiveImmediate());
            }     
           
            sess.close();

       

            sessCreate.deleteQueue(new SimpleString(threadNum + MultiThreadRandomReattachTestBase.ADDRESS.toString()));

       

            sessCreate.close();
         }

       

       

       

      The issue is that ServerConsumer is not taking transfered into consideration.

       

       

      I could fix it with a hack (this is not a proper fix.. this is just to show what the issue is):

       

       

       

         public synchronized void forceDelivery(final long sequence)
         {     
            promptDelivery();
           
            executor.execute(new Runnable()
            {
               public void run()
               {
                  try
                  {
                     // We execute this on the same executor to make sure the force delivery message is written after
                     // any delivery is completed
                    
                     ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);

       

                     forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
                     forcedDeliveryMessage.setAddress(messageQueue.getName());

       

                     final SessionReceiveMessage packet = new SessionReceiveMessage(id, forcedDeliveryMessage, 0);

                     // hack is in red
                     if (transferring)
                     {
                        Thread.sleep(500);
                        executor.execute(this);
                     }
                     else
                     {
                        channel.send(packet);
                     }
                  }
                  catch (Exception e)
                  {
                     ServerConsumerImpl.log.error("Failed to send forced delivery message", e);
                  }
               }
            });
         }

       

       

      BTW: I only spent 5 minutes on this before my day was over on a Friday. TGIF now!