1 2 Previous Next 15 Replies Latest reply on Jan 19, 2009 3:56 AM by timfox

    LargeMessage & Failover Update

    clebert.suconic

      I did some work to make sure the LargeMessage sending will be replicated to the backup nodes.

      On my initial work, I was having a lot of problems on getting the LargeMessage to be replicated and sent on both nodes. I was aways getting an Out-of-credits on the backup when replicating the message, causing issues on replicating the ACKs and properly failing over to the backup.


      Changes I had to make to fix the issues on LargeMessage and Failover:

      1) I simple fix to sendLargeMessage


      On ServerConsumerImpl::
      
       private void sendLargeMessage(final MessageReference ref, final ServerMessage message)
       {
       // TODO: Should we block until the replication is done?
       channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID(), message.getDestination()));
      
       // SendLargeMessage has to be done on the same thread used on the QueueImpl or we would have problems with ordering and flow control
       largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
       largeMessageSender.sendLargeMessage();
      
       }
      



      This above code used to wait the replication to finish before sending the LargeMessage, what would use another thread. The queue would continue its work asynchronously and the Consumer would eventually handle another message while sendLargeMessage was still processing, what would cause issues on the backup node (not enough credits).


      2) When taking credits on backup, we will eventually resume sending the largeMessage.
      That process needs to be done synchronously while receiving the credit from the live node. If we play the commands on a different order we would eventually reject messages because of credit during the replication, or because we would still processing a largeMessage.


      promptDelivery is now calling resumeLargeMessage if largeMessageSender != null


      
       private void promptDelivery()
       {
       if (largeMessageSender != null)
       {
       resumeLargeMessage();
       }
       else
       {
       session.promptDelivery(messageQueue);
       }
       }
      
       private void resumeLargeMessage()
       {
       if (messageQueue.isBackup())
       {
       // We are supposed to finish largeMessageSender, or use all the possible credits before we return this method.
       // If we play the commands on a different order than how they were generated on the live node, we will
       // eventually still be running this largeMessage before the next message come, what would reject messages
       // from the cluster
       largeMessageSender.resumeLargeMessageRunnable.run();
       }
       else
       {
       executor.execute(largeMessageSender.resumeLargeMessageRunnable);
       }
       }
      



      3) While receiving LargeMessages, the client will send credits back, as soon as the chunk is received on the client.
      For that handleLargeMessageContinuation, will call flowControl.

      While flowcontrol is being called, handleLargeMessageContinuation caller will be holding a lock of the ClientRemoteConsumer.

      As a result, we will have a dead lock if failover happens while flowControl is being called within handleLargeMessageContinuation.


      I am now using an executor for the flowControl, when receiving LargeMessages. This way I release the lock on the ClienteRemoteConsumers while sending the flowcontrol back, so Failover will be able to perform outside of the locks.




      All these changes are only applied on https://svn.jboss.org/repos/messaging/branches/Branch_Failover_Page


        • 1. Re: LargeMessage & Failover Update
          clebert.suconic

          Tomorrow I will be playing with MultiThreadFailoverTests... I will try to break a few things and have some fun debugging it.
          I will only propose merging it on trunk after I am done with testing it.

          • 2. Re: LargeMessage & Failover Update
            timfox

             

            "clebert.suconic@jboss.com" wrote:
            I did some work to make sure the LargeMessage sending will be replicated to the backup nodes.

            On my initial work, I was having a lot of problems on getting the LargeMessage to be replicated and sent on both nodes. I was aways getting an Out-of-credits on the backup when replicating the message, causing issues on replicating the ACKs and properly failing over to the backup.


            Changes I had to make to fix the issues on LargeMessage and Failover:

            1) I simple fix to sendLargeMessage


            On ServerConsumerImpl::
            
             private void sendLargeMessage(final MessageReference ref, final ServerMessage message)
             {
             // TODO: Should we block until the replication is done?
             channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID(), message.getDestination()));
            
             // SendLargeMessage has to be done on the same thread used on the QueueImpl or we would have problems with ordering and flow control
             largeMessageSender = new LargeMessageSender((LargeServerMessage)message, ref);
             largeMessageSender.sendLargeMessage();
            
             }
            



            This above code used to wait the replication to finish before sending the LargeMessage, what would use another thread. The queue would continue its work asynchronously and the Consumer would eventually handle another message while sendLargeMessage was still processing, what would cause issues on the backup node (not enough credits).


            When you say "send" I'm assuming you really mean "deliver". (Send means client->server, deliver means server->client). Your usage of send here is confusing.

            For non large messages, delivery to the client only occurs when the replication response has come back from the server. If you just do it the same as that, then should be ok?


            2) When taking credits on backup, we will eventually resume sending the largeMessage.
            That process needs to be done synchronously while receiving the credit from the live node. If we play the commands on a different order we would eventually reject messages because of credit during the replication, or because we would still processing a largeMessage.


            promptDelivery is now calling resumeLargeMessage if largeMessageSender != null


            Sorry, didn't understand that explanation.



            3) While receiving LargeMessages, the client will send credits back, as soon as the chunk is received on the client.
            For that handleLargeMessageContinuation, will call flowControl.

            While flowcontrol is being called, handleLargeMessageContinuation caller will be holding a lock of the ClientRemoteConsumer.


            What is ClientRemoteConsumer?


            As a result, we will have a dead lock if failover happens while flowControl is being called within handleLargeMessageContinuation.


            I am now using an executor for the flowControl, when receiving LargeMessages. This way I release the lock on the ClienteRemoteConsumers while sending the flowcontrol back, so Failover will be able to perform outside of the locks.




            All these changes are only applied on https://svn.jboss.org/repos/messaging/branches/Branch_Failover_Page




            • 3. Re: LargeMessage & Failover Update
              clebert.suconic

               

              "Tim Fox" wrote:

              For non large messages, delivery to the client only occurs when the replication response has come back from the server. If you just do it the same as that, then should be ok?


              No... This is how I used to do it, and it took me a while to figure why it was not working.

              For non large messages, you're sure you will finish the delivery of the message when the replication response has come back.

              However for largeMessages it doesn't work this way.

              As you could interrupt chunk-delivery of the largeMessage waiting for more credits, if eventually ServerConsumer::handle is called with another message it should reject the handle being considered busy.

              Doing the LargeMessageDeliver only after the response is back would cause several issues on the flowControl of the delivery.

              "Tim Fox" wrote:

              Sorry, didn't understand that explanation.


              This has to do with my previous explanation.

              When getting the credit replicated on BackupNode, it will eventually resume delivering the LargeMessage, if it ran out of credit before.

              That resume has also to be done synchronously or else the results are unpredictable. We would eventually get another message on handle while still delivering the first message.

              "Tim Fox" wrote:

              What is ClientRemoteConsumer?



              I meant to say RemotingConnectionImpl.

              But more specifically, the PacketHandler on the client.

              On largeMessageContinuations, flowControl is being called inside the handlePacket method, and failover will dead lock if it fails while the credit is being sent to the server. Using an executor on flowControl on messageContinuations fixed the problem.

              • 4. Re: LargeMessage & Failover Update
                timfox

                 

                "clebert.suconic@jboss.com" wrote:

                Doing the LargeMessageDeliver only after the response is back would cause several issues on the flowControl of the delivery.


                Which are?



                • 5. Re: LargeMessage & Failover Update
                  clebert.suconic

                  - If we wait the replicatePacket to finish, largeMessageDeliver.deliver will be done asynchronously


                   private void deliverLargeMessage(final MessageReference ref, final ServerMessage message)
                   {
                   .....
                  
                  
                   if (!replicating)
                   largeMessageDeliver.deliver(); // synchronous call when not replicating
                  
                   else
                   result.setResultRunner(new Runnable() // asynchronous call when replicating
                   {
                   public void run()
                   {
                   largeMessageDeliverer.deliver();
                   }
                  
                   });
                   }
                  



                  - the method doHandle which is calling deliverLargeMessage, will return immediately, while the server is still deliveringChunks to the client.
                  (promptDelivery BTW will try to send another message to the consumer, but it will be rejected as BUSY)


                  - As the ServerSessionPacketHandler is free, credits will arrive concurrently with the delivery to the client, instead of coming after the method deliver returned. This following logic on receiveCredits will not work:

                   public void receiveCredits(final int credits) throws Exception
                   {
                   if (credits == -1)
                   {
                   // No flow control
                   availableCredits = null;
                   }
                   else
                   {
                   int previous = availableCredits.getAndAdd(credits);
                  
                   if (previous <= 0 && previous + credits > 0)
                   {
                   promptDelivery(); // this is what would resume the delivery of the largeMessage
                   }
                   }
                   }
                  



                  - because of the credits coming on a different order than how they would usually come, the delivery of messages is interrupted. We will get the credit before they became negative.. so nothing will resume delivery.

                  If we called largeMessageDeliver.deliver() synchronously the credits would only arrive at the right time and the resume would work without any problem.

                  - We could of course place the receiveCredits on an executor, scheduling the credit to be added as soon as the largeMessageSender is finished, or play with locks, but that seems weird to me and besides would cause implications on the regular use-case which is sending regular messages.

                  Making the call to largeMessageDeliverer.deliver synchronously would be the easiest solution, where I don't need to use any locks or executors, and believe on the natural order of credits that are coming from the client.





                  • 6. Re: LargeMessage & Failover Update
                    timfox

                     

                    "clebert.suconic@jboss.com" wrote:
                    - If we wait the replicatePacket to finish, largeMessageDeliver.deliver will be done asynchronously


                     private void deliverLargeMessage(final MessageReference ref, final ServerMessage message)
                     {
                     .....
                    
                    
                     if (!replicating)
                     largeMessageDeliver.deliver(); // synchronous call when not replicating
                    
                     else
                     result.setResultRunner(new Runnable() // asynchronous call when replicating
                     {
                     public void run()
                     {
                     largeMessageDeliverer.deliver();
                     }
                    
                     });
                     }
                    



                    - the method doHandle which is calling deliverLargeMessage, will return immediately, while the server is still deliveringChunks to the client.

                    (promptDelivery BTW will try to send another message to the consumer, but it will be rejected as BUSY)


                    - As the ServerSessionPacketHandler is free, credits will arrive concurrently with the delivery to the client, instead of coming after the method deliver returned. This following logic on receiveCredits will not work:

                     public void receiveCredits(final int credits) throws Exception
                     {
                     if (credits == -1)
                     {
                     // No flow control
                     availableCredits = null;
                     }
                     else
                     {
                     int previous = availableCredits.getAndAdd(credits);
                    
                     if (previous <= 0 && previous + credits > 0)
                     {
                     promptDelivery(); // this is what would resume the delivery of the largeMessage
                     }
                     }
                     }
                    



                    - because of the credits coming on a different order than how they would usually come,


                    The order the the credits arrive on the server should be the same. How is this different? The only way the credits could arrive in a different order on the server is if the client sent them in a different order. How is this the case? I think it needs more explanation....


                    the delivery of messages is interrupted. We will get the credit before they became negative.. so nothing will resume delivery.

                    If we called largeMessageDeliver.deliver() synchronously the credits would only arrive at the right time and the resume would work without any problem.


                    I'm not sure what do you mean by "called largeMessageDeliver.deliver() synchronously"



                    • 7. Re: LargeMessage & Failover Update
                      clebert.suconic



                      I'm not sure what do you mean by "called largeMessageDeliver.deliver() synchronously"


                      When replicating, the main method is not waiting deliver to finish and continuing its execution, hence asynchronous per definition.

                      When non replicating, the main is waiting deliver to finish, the main execution of the method is dependent on deliver finish, hence synchronous per definition.


                      The method deliverLargeMessage will return before largeMessageDeliver.deliver finished, as the method will be running on another thread.

                      Credits are arriving before the deliverLargeMessage was finished, hence the wrong order. The credits are arriving before the process finished which would be the natural order.

                      The correct order of credits is:

                      LargeMessageDeliver.deliverBegins:
                      - deliver LargeMessageHeader
                      while (credits)
                      {
                      - deliver LargeMessageChunks
                      }

                      creditsArriving *should arrive* after LargeMessageDeliver was finished.


                      This is happening because the ServerPacketHandler is not locked, and being able to be receive CreditPackets as the chunks are being sent to the client.



                      We would need to lock the credit receiving somehow until deliverLargeMessage is finished, but waiting the execution to finish would give you that lock naturally.

                      • 8. Re: LargeMessage & Failover Update
                        timfox

                        What is largeMessageDeliver.deliver ??

                        • 9. Re: LargeMessage & Failover Update
                          timfox

                          Can you come onto IRC please?

                          • 10. Re: LargeMessage & Failover Update
                            clebert.suconic

                            MessageChunk was initially designed to hold the CurrentFile on the ServerSession, and receive chunks until the last chunk was received.

                            The backup channel is introducing a lot of delay on when the chunks are coming to the ServerConsumerImpl on the LiveNode.

                            I have made it work now, but if we could do something like this, the code would be simpler:

                             DelayedResult result = channel.replicatePacket(packet);
                            
                             if (result != null)
                             {
                             result.waitReplicationComplete(); // a new method to wait the callback
                             }
                             doSendLargeMessage(packet);
                             lock.afterSend();
                            


                            Right now I'm having to deal with locks as a new packet could be coming from the client before the previous packet was replicated.

                            This isn't an issue with smallMessages, as you don't need to deal with the currentLargeMessage being sent or delivered.

                            • 11. Re: LargeMessage & Failover Update
                              timfox

                              Not sure I understand what the issue is. Can you add some more detail please?

                              • 12. Re: LargeMessage & Failover Update
                                clebert.suconic

                                Say you are sending two large message with 2 chunks each.

                                The client will send packets to the server on this order:

                                SessionSendMessage A1 (header)
                                SessionSendContinuationMessage A2 (chunk last=false)
                                SessionSendcontinuationMessage A3 (chunk last=true)

                                SessionSendMessage B1 (header)
                                SessionSendContinuationMessage B2 (chunk last=false)
                                SessionSendcontinuationMessage B3 (chunk last=true)


                                The regular process on ServerSesionImpl should be:

                                A1 received -> CurrentLargeMessage set on the Session
                                A2 received -> add bytes on the currentLargeMessage
                                A3 received -> add bytes on the currentLargeMessage, and route it, and clean currentLargeMessage

                                B1 received -> CurrentLargeMessage set on Session
                                B2 received -> add bytes
                                B3 received -> add bytes, route and clean currentLargeMessage


                                But when we are playing the replication, the flow is delayed and B1 could be received while A3 still being processed.

                                I would get the destination from the currentLargeMessage, but I can't play with it as it could be changed in another thread.

                                I could make it work nicely after finding a few NPEs here and there.. but the code would be cleaner if I could just wait the first and the last package return from the backup node in certain places.

                                • 13. Re: LargeMessage & Failover Update
                                  timfox

                                   

                                  "clebert.suconic@jboss.com" wrote:
                                  Say you are sending two large message with 2 chunks each.

                                  The client will send packets to the server on this order:

                                  SessionSendMessage A1 (header)
                                  SessionSendContinuationMessage A2 (chunk last=false)
                                  SessionSendcontinuationMessage A3 (chunk last=true)

                                  SessionSendMessage B1 (header)
                                  SessionSendContinuationMessage B2 (chunk last=false)
                                  SessionSendcontinuationMessage B3 (chunk last=true)


                                  The regular process on ServerSesionImpl should be:

                                  A1 received -> CurrentLargeMessage set on the Session
                                  A2 received -> add bytes on the currentLargeMessage
                                  A3 received -> add bytes on the currentLargeMessage, and route it, and clean currentLargeMessage

                                  B1 received -> CurrentLargeMessage set on Session
                                  B2 received -> add bytes
                                  B3 received -> add bytes, route and clean currentLargeMessage


                                  But when we are playing the replication, the flow is delayed and B1 could be received while A3 still being processed.

                                  I would get the destination from the currentLargeMessage, but I can't play with it as it could be changed in another thread.

                                  I could make it work nicely after finding a few NPEs here and there.. but the code would be cleaner if I could just wait the first and the last package return from the backup node in certain places.


                                  No, you can't block for replication to complete, that would really slow things up. You would be limiting all message sending to the RTT of replication!
                                  That's a pretty core principle and it applies to all commands, we shouldn't block replicating any commands.

                                  Instead you should just store the LargeMessage in a ConcurrentQueue and pull it off as the replications come back. The replication will always come back in the correct order.





                                  • 14. Re: LargeMessage & Failover Update
                                    timfox

                                    Should be very simple:

                                    When large messages and continuations arrive:

                                    largeMessage.addBytes(continuation);
                                    myConcurrentQueue.add(largeMessage);
                                    replicate(largeMessage);

                                    Then as replications come back:
                                    largeMessage = myConcurrentQueue.poll();
                                    if (last)
                                    {
                                    processLocally();
                                    }

                                    etc

                                    Should just be a few extra lines of code.

                                    1 2 Previous Next