Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 327   Methods: 9
NCLOC: 231   Classes: 1
 
 Source file Conditionals Statements Methods TOTAL
OptimisticReplicationInterceptor.java 73.1% 87.4% 100% 83.9%
coverage coverage
 1    /*
 2    * JBoss, Home of Professional Open Source
 3    *
 4    * Distributable under LGPL license.
 5    * See terms of license at gnu.org.
 6    */
 7    package org.jboss.cache.interceptors;
 8   
 9    import org.jboss.cache.CacheException;
 10    import org.jboss.cache.Fqn;
 11    import org.jboss.cache.InvocationContext;
 12    import org.jboss.cache.config.Configuration;
 13    import org.jboss.cache.config.Option;
 14    import org.jboss.cache.marshall.MethodCall;
 15    import org.jboss.cache.marshall.MethodCallFactory;
 16    import org.jboss.cache.marshall.MethodDeclarations;
 17    import org.jboss.cache.optimistic.DataVersion;
 18    import org.jboss.cache.optimistic.DefaultDataVersion;
 19    import org.jboss.cache.optimistic.TransactionWorkspace;
 20    import org.jboss.cache.optimistic.WorkspaceNode;
 21    import org.jboss.cache.transaction.GlobalTransaction;
 22    import org.jboss.cache.transaction.OptimisticTransactionEntry;
 23    import org.jboss.cache.util.concurrent.ConcurrentHashSet;
 24   
 25    import java.util.ArrayList;
 26    import java.util.List;
 27    import java.util.Set;
 28   
 29    /**
 30    * Replication interceptor for the optimistically locked interceptor chain. Responsible for replicating
 31    * state to remote nodes. Unlike it's cousin, the {@link org.jboss.cache.interceptors.ReplicationInterceptor}, this interceptor
 32    * only deals with transactional calls. Just like all things to do with Optimistic Locking, it is a requirement that
 33    * everything is done in a transaction and the transaction context is available via {@link org.jboss.cache.InvocationContext#getTransaction()}
 34    * and {@link org.jboss.cache.InvocationContext#getGlobalTransaction()}.
 35    *
 36    * @author <a href="mailto:manik@jboss.org">Manik Surtani (manik@jboss.org)</a>
 37    * @author <a href="mailto:stevew@jofti.com">Steve Woodcock (stevew@jofti.com)</a>
 38    */
 39    public class OptimisticReplicationInterceptor extends BaseRpcInterceptor
 40    {
 41   
 42    // record of local broacasts - so we do not broadcast rollbacks/commits that resuted from
 43    // local prepare failures
 44    // we really just need a set here, but concurrent CopyOnWriteArraySet has poor performance when writing.
 45    private final Set<GlobalTransaction> broadcastTxs = new ConcurrentHashSet<GlobalTransaction>();
 46   
 47  2972 public Object invoke(InvocationContext ctx) throws Throwable
 48    {
 49  2972 MethodCall m = ctx.getMethodCall();
 50    // bypass for buddy group org metod calls.
 51  55 if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return super.invoke(ctx);
 52   
 53  2917 Option optionOverride = ctx.getOptionOverrides();
 54  2917 if (optionOverride != null && optionOverride.isCacheModeLocal() && ctx.getTransaction() == null)
 55    {
 56    // skip replication!!
 57  0 log.debug("Skipping replication for this call as cache mode is local, forced via an option override.");
 58  0 return super.invoke(ctx);
 59    }
 60   
 61  2917 Object retval;
 62   
 63  0 if (log.isTraceEnabled()) log.trace("Processing method " + m);
 64   
 65    // on a local prepare we first run the prepare -
 66    //if this works broadcast it
 67  2917 GlobalTransaction gtx = null; // don't initialise this here; since some method calls may not have gtxs (such as buddy group organisation calls)
 68   
 69  2917 switch (m.getMethodId())
 70    {
 71  405 case MethodDeclarations.optimisticPrepareMethod_id:
 72    // pass up the chain.
 73  405 retval = super.invoke(ctx);
 74  394 gtx = getGlobalTransaction(ctx);
 75   
 76  394 if (!gtx.isRemote() && ctx.isOriginLocal())
 77    {
 78    // replicate the prepare call.
 79  218 broadcastPrepare(m, gtx);
 80    }
 81  387 break;
 82  1180 case MethodDeclarations.commitMethod_id:
 83    //lets broadcast the commit first
 84  1180 Throwable remoteCommitException = null;
 85  1180 gtx = getGlobalTransaction(ctx);
 86  1180 if (!gtx.isRemote() && ctx.isOriginLocal() && broadcastTxs.contains(gtx))
 87    {
 88    //we dont do anything
 89  163 try
 90    {
 91  163 broadcastCommit(gtx);
 92    }
 93    catch (Throwable t)
 94    {
 95  2 log.error("A problem occurred with remote commit", t);
 96  2 remoteCommitException = t;
 97    }
 98    }
 99   
 100  1180 retval = super.invoke(ctx);
 101  1180 if (remoteCommitException != null)
 102    {
 103  2 throw remoteCommitException;
 104    }
 105  1178 break;
 106  31 case MethodDeclarations.rollbackMethod_id:
 107    // lets broadcast the rollback first
 108  31 gtx = getGlobalTransaction(ctx);
 109  31 Throwable remoteRollbackException = null;
 110  31 if (!gtx.isRemote() && ctx.isOriginLocal() && broadcastTxs.contains(gtx))
 111    {
 112    //we dont do anything
 113  6 try
 114    {
 115  6 broadcastRollback(gtx);
 116    }
 117    catch (Throwable t)
 118    {
 119  4 log.error(" a problem occurred with remote rollback", t);
 120  4 remoteRollbackException = t;
 121    }
 122   
 123    }
 124  31 retval = super.invoke(ctx);
 125  31 if (remoteRollbackException != null)
 126    {
 127  4 throw remoteRollbackException;
 128    }
 129  27 break;
 130  16 case MethodDeclarations.putForExternalReadMethodLocal_id:
 131  16 gtx = getGlobalTransaction(ctx);
 132  16 cache.getTransactionTable().get(gtx).setForceAsyncReplication(true);
 133    // and follow on to default behaviour now ...
 134  1285 default:
 135    //it is something we do not care about
 136  0 if (log.isTraceEnabled()) log.trace("Received method " + m + " not handling");
 137  1301 retval = super.invoke(ctx);
 138  1301 break;
 139    }
 140  2893 return retval;
 141    }
 142   
 143  1621 private GlobalTransaction getGlobalTransaction(InvocationContext ctx)
 144    {
 145    // get the current gtx
 146  1621 GlobalTransaction gtx = ctx.getGlobalTransaction();
 147  1621 if (gtx == null)
 148    {
 149  0 throw new CacheException("failed to get global transaction");
 150    }
 151  1621 return gtx;
 152    }
 153   
 154  218 protected void broadcastPrepare(MethodCall methodCall, GlobalTransaction gtx) throws Throwable
 155    {
 156  218 boolean remoteCallSync = configuration.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
 157   
 158  218 Object[] args = methodCall.getArgs();
 159  218 List modifications = (List) args[1];
 160  218 int num_mods = modifications != null ? modifications.size() : 0;
 161   
 162    // this method will return immediately if we're the only member
 163  218 if (cache.getMembers() != null && cache.getMembers().size() > 1)
 164    {
 165    // Map method calls to data versioned equivalents.
 166    // See JBCACHE-843 and docs/design/DataVersioning.txt
 167  170 MethodCall toBroadcast = mapDataVersionedMethodCalls(methodCall, getTransactionWorkspace(gtx));
 168   
 169    //record the things we have possibly sent
 170  169 broadcastTxs.add(gtx);
 171  169 if (log.isDebugEnabled())
 172    {
 173  0 log.debug("(" + cache.getLocalAddress()
 174    + "): broadcasting prepare for " + gtx
 175    + " (" + num_mods + " modifications");
 176    }
 177   
 178  169 replicateCall(toBroadcast, remoteCallSync);
 179    }
 180    else
 181    {
 182    //no members, ignoring
 183  48 if (log.isDebugEnabled())
 184    {
 185  0 log.debug("(" + cache.getLocalAddress()
 186    + "):not broadcasting prepare as members are " + cache.getMembers());
 187    }
 188    }
 189    }
 190   
 191   
 192  163 protected void broadcastCommit(GlobalTransaction gtx) throws Throwable
 193    {
 194  163 boolean remoteCallSync = configuration.isSyncCommitPhase();
 195   
 196    // Broadcast commit() to all members (exclude myself though)
 197  163 if (cache.getMembers() != null && cache.getMembers().size() > 1)
 198    {
 199  163 try
 200    {
 201  163 broadcastTxs.remove(gtx);
 202  163 MethodCall commit_method = MethodCallFactory.create(MethodDeclarations.commitMethod, gtx);
 203   
 204  163 if (log.isDebugEnabled())
 205  0 log.debug("running remote commit for " + gtx + " and coord=" + cache.getLocalAddress());
 206   
 207  163 replicateCall(commit_method, remoteCallSync);
 208    }
 209    catch (Exception e)
 210    {
 211  0 log.error("Commit failed", e);
 212  0 throw e;
 213    }
 214    }
 215    }
 216   
 217  6 protected void broadcastRollback(GlobalTransaction gtx) throws Throwable
 218    {
 219  6 boolean remoteCallSync = configuration.isSyncRollbackPhase();
 220   
 221  6 if (cache.getMembers() != null && cache.getMembers().size() > 1)
 222    {
 223    // Broadcast rollback() to all other members (excluding myself)
 224  5 try
 225    {
 226  5 broadcastTxs.remove(gtx);
 227  5 MethodCall rollback_method = MethodCallFactory.create(MethodDeclarations.rollbackMethod, gtx);
 228   
 229  5 if (log.isDebugEnabled())
 230  0 log.debug("running remote rollback for " + gtx + " and coord=" + cache.getLocalAddress());
 231  5 replicateCall(rollback_method, remoteCallSync);
 232    }
 233    catch (Exception e)
 234    {
 235  4 log.error("Rollback failed", e);
 236  4 throw e;
 237    }
 238    }
 239    }
 240   
 241  170 private MethodCall mapDataVersionedMethodCalls(MethodCall m, TransactionWorkspace w)
 242    {
 243  170 Object[] origArgs = m.getArgs();
 244  170 return MethodCallFactory.create(m.getMethod(), origArgs[0], translate((List<MethodCall>) origArgs[1], w), origArgs[2], origArgs[3], origArgs[4]);
 245    }
 246   
 247    /**
 248    * Translates a list of MethodCalls from non-versioned calls to versioned calls.
 249    */
 250  170 private List<MethodCall> translate(List<MethodCall> l, TransactionWorkspace w)
 251    {
 252  170 List<MethodCall> newList = new ArrayList<MethodCall>();
 253  170 for (MethodCall origCall : l)
 254    {
 255  195 if (MethodDeclarations.isDataGravitationMethod(origCall.getMethodId()))
 256    {
 257    // no need to translate data gravitation calls.
 258  4 newList.add(origCall);
 259    }
 260    else
 261    {
 262  191 Object[] origArgs = origCall.getArgs();
 263    // get the data version associated with this orig call.
 264   
 265    // since these are all crud methods the Fqn is at arg subscript 1.
 266  191 Fqn fqn = (Fqn) origArgs[origCall.getMethodId() == MethodDeclarations.moveMethodLocal_id ? 0 : 1];
 267    // now get a hold of the data version for this specific modification
 268  191 DataVersion versionToBroadcast = getVersionToBroadcast(w, fqn);
 269   
 270    // build up the new arguments list for the new call. Identical to the original lis except that it has the
 271    // data version tacked on to the end.
 272  190 Object[] newArgs = new Object[origArgs.length + 1];
 273  190 System.arraycopy(origArgs, 0, newArgs, 0, origArgs.length);
 274  190 newArgs[origArgs.length] = versionToBroadcast;
 275   
 276    // now create a new method call which contains this data version
 277  190 MethodCall newCall = MethodCallFactory.create(MethodDeclarations.getVersionedMethod(origCall.getMethodId()), newArgs);
 278   
 279    // and add it to the new list.
 280  190 newList.add(newCall);
 281    }
 282    }
 283  169 return newList;
 284    }
 285   
 286    /**
 287    * Digs out the DataVersion for a given Fqn. If the versioning is explicit, it is passed as-is. If implicit, it is
 288    * cloned and then incremented, and the clone is returned.
 289    */
 290  191 private DataVersion getVersionToBroadcast(TransactionWorkspace w, Fqn f)
 291    {
 292  191 WorkspaceNode n = w.getNode(f);
 293  191 if (n == null)
 294    {
 295  0 if (log.isTraceEnabled()) log.trace("Fqn " + f + " not found in workspace; not using a data version.");
 296  1 return null;
 297    }
 298  190 if (n.isVersioningImplicit())
 299    {
 300  178 DefaultDataVersion v = (DefaultDataVersion) n.getVersion();
 301  177 if (log.isTraceEnabled())
 302  0 log.trace("Fqn " + f + " has implicit versioning. Broadcasting an incremented version.");
 303   
 304    // potential bug here - need to check if we *need* to increment at all, because of Configuration.isLockParentForChildInsertRemove()
 305  177 return v.increment();
 306    }
 307    else
 308    {
 309  0 if (log.isTraceEnabled()) log.trace("Fqn " + f + " has explicit versioning. Broadcasting the version as-is.");
 310  12 return n.getVersion();
 311    }
 312    }
 313   
 314  170 protected TransactionWorkspace getTransactionWorkspace(GlobalTransaction gtx) throws CacheException
 315    {
 316  170 OptimisticTransactionEntry transactionEntry = (OptimisticTransactionEntry) cache.getTransactionTable().get(gtx);
 317   
 318  170 if (transactionEntry == null)
 319    {
 320  0 throw new CacheException("unable to map global transaction " + gtx + " to transaction entry");
 321    }
 322   
 323    // try and get the workspace from the transaction
 324  170 return transactionEntry.getTransactionWorkSpace();
 325    }
 326   
 327    }