2 Replies Latest reply on Jan 6, 2006 2:07 AM by Bela Ban

    JBCACHE-326 --

    Brian Stansberry Master

      Discussion thread for http://jira.jboss.com/jira/browse/JBCACHE-326. Starting with excerpts from an e-mail thread a few months back that led to the JIRA issue:

      "bstansberry@jboss.com" wrote:
      I've written some unit tests designed to test how partial state transfer
      works under concurrent load and saw the following issue. Wanted to get
      your input as to whether what I'm seeing is normal, or maybe a sign I
      did something wrong.

      In my test I've got multiple cache instances, but to keep it simple,
      we'll just talk about A and B. REPL_SYNC. For both A and B there are
      two threads operating on the cache; one spawned by the test driver
      (A.driver and B.driver), and the other is the upHandler thread from the
      JGroups layer (A.up and B.up).

      1) A.driver initiates a partial state transfer request to B for region
      2) *Simultaneously* B.driver initiates a put in /B/1, acquiring a write
      lock on /B/1.
      3) B.up receives the partial state transfer request. The state transfer
      requires a read lock on all nodes in /B, so B.up blocks waiting for
      B.driver to release the write lock on /B/1.
      4) B.driver generates the replication message for the put on /B/1.
      5) A.up receives the replication message, sees the relevant region is
      undergoing a state transfer, enqueues the method call, and promptly
      returns, generating a valid response to B.
      6) B's RequestCorrelator cannot receive the response from A because B.up
      is blocking waiting for the write lock on /B/1 to be released. The
      write lock can't be released until the response from A is processed.
      Effect is node B is brought to a halt until the state tranfer timeout is
      reached. Actually, the whole cluster freezes, as no node can complete
      any replication until B.up unblocks.

      I looked at this pretty carefully, and it seems like it's just the way
      it is. But the thing that makes me question if I did something wrong is
      this behavior would imply the whole cluster would freeze for roughly 10
      seconds anytime an upHandler thread gets blocked by a deadlock.

      "bela@jboss.com" wrote:
      Hmm, I don't think we can solve this problem. Even with partial state transfer done at the *JGroups* level,
      the fact that state transfer needs to acquire a RL and some other TX might have a lock held, will always
      cause a TimeoutException for state transfer with REPL_SYNC.

      Wait: how about the following ? The state transfer method call remotely, let's call it

      byte[] acquirePartialState(long timeout)

      could simply force-release all locks held by any TX in the given subtree if it cannot acquire the state within timeout ms.
      This would cause some TXs that are currently holding locks to break (rollback ?), but we would be guaranteed that
      state transfer would always succeed.

      Hmm, force-release won't cause a TX to be rolled back, so we'd have to somehow find the TX associated with
      a lock and programmatically call TX.setRollbackOnly(true).

      Wait, no, this won't work either cause breaking the TX won't remove the replication reply from the stack, so the synchronous call will
      never return !

      Okay, how about this then:
      - Make the state transfer call *asynchronous*
      - Add a 'sender' parameter to the acquirePartialState() method
      - When you receive the acquirePartialState() method, enqueue it, or generally handle the method in a separate thread
      - This method still needs to break all locks in the subtree it wants to acquire, and rollback TXs that are holding those locks.
      Note that this *should* be implemented in our current state transfer mechanism as well (I'll create a JIRA issue for that)
      - When acquirePartialState() returns it sends an asynchronous reply to 'sender', e.g. partialStateTransferReply(byte[])
      - The sender simply waits on a mutex and when the state has been received (partialStateTransferReply() called), this mutex is unlocked.
      - We probably also need an ID for the state that is transferred so we can correlate the request with the reply at the sender

      We created issue JBCACHE-315, which has to do with breaking locks for the state transfer; this thread deals with the more general issue of messages needed to clear locks being stuck in a queue behind other messages that would prevent their being processed:

      "bela@jboss.org" wrote:
      I have come up with another use case that might cause us problems:
      - B.driver holds the lock
      - B has received a bunch of PREPARE messages from other members in the cluster (say 3)
      - All PREPARE messages need to acquire the lock
      - Now the state request from A is received on B
      - So the stack has (from top to bottom): PREPARE1, PREPARE2, PREPARE3, STATE-REQUEST
      - The state request from A cannot be processed until all 3 PREPARE messages have bee processed, however
      only the state request can actually force-release the locks held by B.driver
      --> But even if the state transfer request force-released locks held by B.driver, the PREPARE messages would acquire *new* locks !

      Looks like this problem can only *really* be solved by the change in JGroups I propose in
      http://jira.jboss.com/jira/browse/JGRP-82 (FLUSH) and http://jira.jboss.com/jira/browse/JGRP-118 (Partial State transfer)

      In a nutshell, here's what this will do:
      - When we transfer the state, we need to stop-the-world a.k.a. acquiesce the cluster
      - This is done using a FLUSH protocol in JGroups, which will result in a block() at every cluster node
      - In the block(), JBossCache needs to flush all open transactions, ie. wait until they have been completed, or a certain number of ms have elapsed. In
      the latter case, we simply force-release all locks and roll back the associated TXs
      - Then we do the state transfer and finally each cluster node resumes accepting requests
      - Regarding the block() handling: we could probably simply set a mutex (in an interceptor) to block *all* calls (even get()s !), or
      an alternative could be to queue the calls

      Problem: the block() is not received until all pending requests have been processed. If the requests block for locks, this can be a long time.
      SOLUTION: change the handling of requests in JBossCache (another interceptor on top of ReplicationInterceptor):
      - Create 2 queues (of MethodCall objects), one thread for each queue
      - The regular MethodCalls like put(), remove() etc go into the default queue (A), where they are processed according to order (FIFO)
      - The special calls like block(), acquirePartialState(), or acks for PREPARE/COMMIT calls go into the other (priority) queue (B), these calls *CAN* be received out of sequence
      - This way, an acquirePartialState() would always be processed and would be able to (1) stop the processing of queue A and (2) force-release the locks held
      by B.driver

      This solution would also solve the problem I described at the very top of this email.

      "manik@jboss.com" wrote:
      I like the idea of having a priority queue processed in a separate thread from the default queue. I can see how this adds a layer of efficiency for synchronisation tasks and would probably be very useful both now and in future (I'm thinking more intelligent algorithms where partitioned data in a partitioned cache can be moved closer to nodes that access this data most frequently)

      "bstansberry@jboss.com" wrote:
      I like the idea of a priority queue as well; I can see how that could be very useful. Thinking out loud here -- would COMMIT/ROLLBACK messages go to the priority queue? Idea is the normal queue imposes FIFO on the transactions since it handles PREPARE, but the priority queue allows COMMIT/ROLLBACK to clear locks promptly.

      Returning to a basic premise of my original message (i.e. Brian's doing something stupid) -- I think setting deadlockDetection = true in the cache resolves my problem. In the partial state transfer problem I originally described, the fundamental problem wasn't really in the JBossCache layer, it was in the JGroups layer, as the responses to the PREPARE calls never get out of JGroups. B.driver calls into RpcDispatcher.callRemoteMethods() --> GroupRequest.execute() --> GroupRequest.doExecute(). Then GroupRequest doesn't receive the responses because B.up is blocked. As I look at RequestCorrelator, it seems if deadlock detection is on this problem is avoided. I've re-run my test about 20 times with it on and stop getting failures.

      Bela later commented that he didn't think that setting deadlockDetection to true would work. I did find that it resolved the problem of messages being stuck in the RpcDispatcher. However, setting this to true led to other problems with messages being processed in the wrong order, so I don't think it is a valid solution for this problem.

        • 1. Re: JBCACHE-326 Priority queue to speed execution of certain
          Brian Stansberry Master

          I don't think using a cache interceptor to manage a separate queue for priority messages will work. This is because the messages that are being handled are RPC calls -- in most cases a reply needs to be sent to the originating cache instance. Anything we do that involves multi-threaded handling of RPC request is going to have to affect the RpcDispatcher code, which is currently using a single thread and expects the rpc call and return value to be handled by that thread.

          Looking at the code, the message flow looks like this:

          JChannel.up() --> MessageDispatcher.ProtocolAdapter.up() --> RequestCorrelator.receive() --> RequestCorrelator.receiveMessage() --> RequestCorrelator.handleRequest() --> RpcDispatcher.handle() --> TreeCache.replicate()

          It's the RequestCorrelator.handleRequest() method that ends up sending a reply back down the stack. It seems to me that this method or RequestCorrelator.receiveMessage() is the point where multi-threaded processing needs to be handled. When the message with the RPC calls comes up from the below, the calling thread needs to be able to immediately return, in order to allow the next message to be taken from the channel. When the response to the RPC call comes back, it needs to be correlated with the initial request and sent back down the channel.

          RequestCorrelator.receiveMessage() already has this kind of asynchronous call handling logic if deadlockDetection is used, but I think what we're looking for here is a more generic asynchronous processing ability, with whatever code is responsible for handling the request also responsible for bundling the response with a correlation id, and then passing it to a callback method somewhere (probably RpcDispatcher) The bundled response/correlation id eventually gets passed backed to the RequestCorrelator and sent down the channel.

          • 2. Re: JBCACHE-326 --
            Bela Ban Master

            How about the following ?

            We add a PriorityInterceptor on top of Replication/InvalidationInterceptor. If cache is LOCAL, then this is not done

            The PrioInterceptor *copies* the interceptor chain starting from him until it reaches the cache. If we need state to be copied as well, we may have to add a clone() method to Interceptor. (But I think we should be able to do without state).

            Note that some interceptors might decide to implement clone() like this:

            public Object clone() {return this;}
            So, this means, they're effectively singletons.

            So the PrioInterceptor has now 2 chains: the default chain, which can be invoked by calling super.invoke(m), and the prio chain, which can be called with prio_chain.invoke(m).

            The invoke(MethodCall m) method checks whether we have a priority request, and dispatches them accordingly, e.g.:

            public Object invoke(MethodCall m) throws Ex {
             if(isPrioRequest(m)) {
             return prio_chain.invoke(m);
             else super.invoke(m);

            Right now, the only prio requests would be the state transfer requests.