3 Replies Latest reply on Jun 28, 2011 4:50 AM by ganso

    Diverted large messages are not delivered correctly after restarting of the server

    ganso

      I am using HornetQ2.2.5Final configured diverting from Queue A to Queue B(Non-exclusive) and I found the HornetQ behaviour which diverted large messages disappears.

       

      Here are steps I have done:

      1. Send a large message to Queue A

      2. Restart HorneteQ server.

      3. Receives a large message from Queue A.

      4. Receives a large message from Queue B.

       

      Result:

      First message could receive successfully at 3). However, next message has zero bytes, or null value with their body.

      (Even if I change the order to receive from the two Queues, second one has null value.)

       

       

      The journal file of a large message is deleted at the first message reception, because the value of LargeServerMessageImpl.delayDeletionCount gets to have incorrect value after restarting the server.

       

      This problem is caused by the order confliction of writing to journal and loading it. In other words,  the counter to decide delete timing does not work correctly.

      The writing order of original message and diverted message is:

      At first, writes divert message.

      Next, writes original message.

       

      The reading order at starting a server:

      At first, HornetQ loads a diverted message, and then creates the original message object and increments DelayDeletionCount. (Message.HDR_ORIG_MESSAGE_ID is included in the property. JournalStorageManager.parseLargeMessage() )

      Next, loads original message and overwrites the message map without increment DelayDeletionCount.(JournalStorageManager.loadMessageJournal)

       

      I think that the journal loader should check whether the message has loaded already and count up appropriately . Or messages writing order may be wrong.

       

       

      JournalStorageManager.loadMessageJournal()
      
            (snip) 
      
                byte recordType = record.getUserRecordType();
      
               switch (recordType)
               {
                  case ADD_LARGE_MESSAGE:
                  {
                     LargeServerMessage largeMessage = parseLargeMessage(messages, buff);
      
                     messages.put(record.id, largeMessage);
      
                     largeMessages.add(largeMessage);
      
                     break;
                  }
      
       (snip)
      
      

       

       

         private LargeServerMessage parseLargeMessage(final Map<Long, ServerMessage> messages, final HornetQBuffer buff) throws Exception
         {
            LargeServerMessage largeMessage = createLargeMessage();
      
            LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
      
            messageEncoding.decode(buff);
      
            if (largeMessage.containsProperty(Message.HDR_ORIG_MESSAGE_ID))
            {
               long originalMessageID = largeMessage.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
      
               LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
      
               if (originalMessage == null)
               {
                  // this could happen if the message was deleted but the file still exists as the file still being used
                  originalMessage = createLargeMessage();
                  originalMessage.setDurable(true);
                  originalMessage.setMessageID(originalMessageID);
                  messages.put(originalMessageID, originalMessage);
               }
      
               originalMessage.incrementDelayDeletionCount();
      
               largeMessage.setLinkedMessage(originalMessage);
            }
            return largeMessage;
         }
      
      

       

       

      Thanks,

       

      Masao Kato