Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 483   Methods: 17
NCLOC: 348   Classes: 2
 
 Source file Conditionals Statements Methods TOTAL
DataGravitatorInterceptor.java 59.8% 77.4% 94.1% 72.4%
coverage coverage
 1    /*
 2    * JBoss, Home of Professional Open Source
 3    *
 4    * Distributable under LGPL license.
 5    * See terms of license at gnu.org.
 6    */
 7    package org.jboss.cache.interceptors;
 8   
 9    import org.apache.commons.logging.Log;
 10    import org.apache.commons.logging.LogFactory;
 11    import org.jboss.cache.CacheException;
 12    import org.jboss.cache.CacheSPI;
 13    import org.jboss.cache.Fqn;
 14    import org.jboss.cache.InvocationContext;
 15    import org.jboss.cache.Node;
 16    import org.jboss.cache.NodeSPI;
 17    import org.jboss.cache.buddyreplication.BuddyManager;
 18    import org.jboss.cache.buddyreplication.GravitateResult;
 19    import org.jboss.cache.config.Configuration;
 20    import org.jboss.cache.marshall.MethodCall;
 21    import org.jboss.cache.marshall.MethodCallFactory;
 22    import org.jboss.cache.marshall.MethodDeclarations;
 23    import org.jboss.cache.marshall.NodeData;
 24    import org.jboss.cache.transaction.GlobalTransaction;
 25    import org.jboss.cache.transaction.TransactionEntry;
 26    import org.jgroups.Address;
 27    import org.jgroups.blocks.GroupRequest;
 28   
 29    import java.util.ArrayList;
 30    import java.util.Collection;
 31    import java.util.Collections;
 32    import java.util.List;
 33    import java.util.Map;
 34    import java.util.concurrent.ConcurrentHashMap;
 35   
 36    /**
 37    * The Data Gravitator interceptor intercepts cache misses and attempts t
 38    * gravitate data from other parts of the cluster.
 39    * <p/>
 40    * Only used if Buddy Replication is enabled. Also, the interceptor only kicks
 41    * in if an {@link org.jboss.cache.config.Option} is passed in to force Data
 42    * Gravitation for a specific invocation or if <b>autoDataGravitation</b> is
 43    * set to <b>true</b> when configuring Buddy Replication.
 44    * <p/>
 45    * See the JBoss Cache User Guide for more details on configuration options.
 46    * There is a section dedicated to Buddy Replication in the Replication
 47    * chapter.
 48    *
 49    * @author <a href="mailto:manik@jboss.org">Manik Surtani (manik@jboss.org)</a>
 50    */
 51    public class DataGravitatorInterceptor extends BaseRpcInterceptor
 52    {
 53    private BuddyManager buddyManager;
 54    private boolean syncCommunications = false;
 55    private Log log = LogFactory.getLog(DataGravitatorInterceptor.class);
 56    private Map transactionMods = new ConcurrentHashMap();
 57   
 58  332 public void setCache(CacheSPI cache)
 59    {
 60  332 super.setCache(cache);
 61  332 this.buddyManager = cache.getBuddyManager();
 62  332 syncCommunications = configuration.getCacheMode() == Configuration.CacheMode.REPL_SYNC || configuration.getCacheMode() == Configuration.CacheMode.INVALIDATION_SYNC;
 63    }
 64   
 65  3335 public Object invoke(InvocationContext ctx) throws Throwable
 66    {
 67  3335 MethodCall m = ctx.getMethodCall();
 68  0 if (log.isTraceEnabled()) log.trace("Invoked with method call " + m);
 69   
 70  3335 if (MethodDeclarations.isBlockUnblockMethod(m.getMethodId()))
 71    {
 72  1339 return super.invoke(ctx);
 73    }
 74   
 75    // Transactional lifecycle methods should be handled regardless of whether data gravitation is enabled or not.
 76  1996 if (!MethodDeclarations.isTransactionLifecycleMethod(m.getMethodId()))
 77    {
 78  1790 if (isGravitationEnabled(ctx) && MethodDeclarations.isGetMethod(m.getMethodId()))
 79    {
 80    // test that the Fqn being requested exists locally in the cache.
 81  191 Fqn fqn = extractFqn(m.getMethodId(), m.getArgs());
 82  0 if (log.isTraceEnabled()) log.trace("Checking local existence of fqn " + fqn);
 83  191 if (BuddyManager.isBackupFqn(fqn))
 84    {
 85  15 log.info("Is call for a backup Fqn, not performing any gravitation. Direct calls on internal backup nodes are *not* supported.");
 86    }
 87    else
 88    {
 89  176 if (cache.peek(fqn, false) == null)
 90    {
 91  50 BackupData data;
 92   
 93    // perform a data gravitation
 94  50 if (localBackupExists(fqn))
 95    {
 96  15 log.trace("Gravitating from local backup tree");
 97  15 data = localBackupGet(fqn, ctx);
 98    }
 99    else
 100    {
 101  35 log.trace("Gravitating from remote backup tree");
 102    // gravitate remotely.
 103  35 data = remoteBackupGet(fqn);
 104    }
 105   
 106  50 if (data != null)
 107    {
 108    // create node locally so I don't gravitate again
 109    // when I do the put() call to the cluster!
 110    //createNode(data.backupData, true);
 111    // Make sure I replicate to my buddies.
 112  34 log.trace("Passing the put call locally to make sure state is persisted and ownership is correctly established.");
 113  34 createNode(data.backupData, false);
 114   
 115    // very strange, the invocation contexts get twisted up here, and will need preservation.
 116    // a bit crappy and hacky, all will be solved when we move to JBoss AOP in 2.1.0
 117    //ctx.setMethodCall(m);
 118   
 119    // Clean up the other nodes
 120  33 cleanBackupData(data, ctx.getGlobalTransaction());
 121    }
 122    }
 123    }
 124    }
 125    else
 126    {
 127  1599 if (log.isTraceEnabled())
 128    {
 129  0 log.trace("Suppressing data gravitation for this call.");
 130    }
 131    }
 132    }
 133    else
 134    {
 135   
 136  206 try
 137    {
 138  206 switch (m.getMethodId())
 139    {
 140  24 case MethodDeclarations.prepareMethod_id:
 141  21 case MethodDeclarations.optimisticPrepareMethod_id:
 142  45 Object o = super.invoke(ctx);
 143  45 doPrepare(ctx.getGlobalTransaction());
 144  45 return o;
 145  1 case MethodDeclarations.rollbackMethod_id:
 146  1 transactionMods.remove(ctx.getGlobalTransaction());
 147  1 return super.invoke(ctx);
 148  160 case MethodDeclarations.commitMethod_id:
 149  160 doCommit(ctx.getGlobalTransaction());
 150  159 transactionMods.remove(ctx.getGlobalTransaction());
 151  159 return super.invoke(ctx);
 152    }
 153    }
 154    catch (Throwable throwable)
 155    {
 156  1 transactionMods.remove(ctx.getGlobalTransaction());
 157  1 throw throwable;
 158    }
 159    }
 160    // }
 161    // }
 162    // else
 163    // {
 164    // if (log.isTraceEnabled())
 165    // log.trace("Suppressing data gravitation for this call.");
 166    // }
 167  1789 return super.invoke(ctx);
 168    }
 169   
 170  1790 private boolean isGravitationEnabled(InvocationContext ctx)
 171    {
 172  1790 boolean enabled = ctx.isOriginLocal();
 173  1790 if (enabled)
 174    {
 175  467 if (!buddyManager.isAutoDataGravitation())
 176    {
 177  374 enabled = ctx.getOptionOverrides().getForceDataGravitation();
 178    }
 179    }
 180  1790 return enabled;
 181    }
 182   
 183  45 private void doPrepare(GlobalTransaction gtx) throws Throwable
 184    {
 185  45 MethodCall cleanup = (MethodCall) transactionMods.get(gtx);
 186  0 if (log.isTraceEnabled()) log.trace("Broadcasting prepare for cleanup ops " + cleanup);
 187  45 if (cleanup != null)
 188    {
 189  5 MethodCall prepare;
 190  5 List mods = new ArrayList(1);
 191  5 mods.add(cleanup);
 192  5 if (configuration.isNodeLockingOptimistic())
 193    {
 194  4 prepare = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod, gtx, mods, null, cache.getLocalAddress(), false);
 195    }
 196    else
 197    {
 198  1 prepare = MethodCallFactory.create(MethodDeclarations.prepareMethod, gtx, mods, cache.getLocalAddress(), syncCommunications);
 199    }
 200   
 201  5 replicateCall(getMembersOutsideBuddyGroup(), prepare, syncCommunications);
 202    }
 203    else
 204    {
 205  0 if (log.isTraceEnabled()) log.trace("Nothing to broadcast in prepare phase for gtx " + gtx);
 206    }
 207    }
 208   
 209  160 private void doCommit(GlobalTransaction gtx) throws Throwable
 210    {
 211  160 if (transactionMods.containsKey(gtx))
 212    {
 213  0 if (log.isTraceEnabled()) log.trace("Broadcasting commit for gtx " + gtx);
 214  5 replicateCall(getMembersOutsideBuddyGroup(), MethodCallFactory.create(MethodDeclarations.commitMethod, gtx), syncCommunications);
 215    }
 216    else
 217    {
 218  0 if (log.isTraceEnabled()) log.trace("Nothing to broadcast in commit phase for gtx " + gtx);
 219    }
 220    }
 221   
 222  10 private List<Address> getMembersOutsideBuddyGroup()
 223    {
 224  10 List<Address> members = new ArrayList<Address>(cache.getMembers());
 225  10 members.remove(cache.getLocalAddress());
 226  10 members.removeAll(buddyManager.getBuddyAddresses());
 227  10 return members;
 228    }
 229   
 230  35 private BackupData remoteBackupGet(Fqn name) throws Exception
 231    {
 232   
 233  35 BackupData result = null;
 234   
 235  35 GravitateResult gr = gravitateData(name);
 236   
 237  35 if (gr.isDataFound())
 238    {
 239  19 if (log.isTraceEnabled())
 240    {
 241  0 log.trace("Got response " + gr);
 242    }
 243   
 244    // if (configuration.isUseRegionBasedMarshalling())
 245    // {
 246    // ClassLoader cl = Thread.currentThread().getContextClassLoader();
 247    // try
 248    // {
 249    // cache.getRegionManager().setContextClassLoaderAsCurrent(name);
 250    //
 251    // byte[] nodeData = (byte[]) resp[0];
 252    // ByteArrayInputStream bais = new ByteArrayInputStream(nodeData);
 253    // MarshalledValueInputStream mais = new MarshalledValueInputStream(bais);
 254    // nodes = (List) mais.readObject();
 255    // mais.close();
 256    // }
 257    // finally
 258    // {
 259    // Thread.currentThread().setContextClassLoader(cl);
 260    // }
 261    // }
 262    // else
 263    // {
 264    // }
 265   
 266  19 result = new BackupData(name, gr);
 267    }
 268   
 269  35 return result;
 270    }
 271   
 272  33 private void cleanBackupData(BackupData backup, GlobalTransaction gtx) throws Throwable
 273    {
 274    // MethodCall primaryDataCleanup, backupDataCleanup;
 275    // if (buddyManager.isDataGravitationRemoveOnFind())
 276    // {
 277    // primaryDataCleanup = MethodCallFactory.create(MethodDeclarations.removeNodeMethodLocal, new Object[]{null, backup.primaryFqn, Boolean.FALSE});
 278    // backupDataCleanup = MethodCallFactory.create(MethodDeclarations.removeNodeMethodLocal, new Object[]{null, backup.backupFqn, Boolean.FALSE});
 279    // }
 280    // else
 281    // {
 282    // primaryDataCleanup = MethodCallFactory.create(MethodDeclarations.evictNodeMethodLocal, new Object[]{backup.primaryFqn});
 283    // backupDataCleanup = MethodCallFactory.create(MethodDeclarations.evictNodeMethodLocal, new Object[]{backup.backupFqn});
 284    // }
 285   
 286  33 MethodCall cleanup = MethodCallFactory.create(MethodDeclarations.dataGravitationCleanupMethod, gtx, backup.primaryFqn, backup.backupFqn);
 287   
 288   
 289  0 if (log.isTraceEnabled()) log.trace("Performing cleanup on [" + backup.primaryFqn + "]");
 290  33 if (gtx == null)
 291    {
 292    // broadcast removes
 293    // remove main Fqn
 294    //replicateCall(cache.getMembers(), primaryDataCleanup, syncCommunications);
 295   
 296  0 if (log.isTraceEnabled()) log.trace("Performing cleanup on [" + backup.backupFqn + "]");
 297    // remove backup Fqn
 298    //replicateCall(cache.getMembers(), backupDataCleanup, syncCommunications);
 299  27 replicateCall(cache.getMembers(), cleanup, syncCommunications);
 300    }
 301    else
 302    {
 303  6 if (log.isTraceEnabled())
 304    {
 305  0 log.trace("Data gravitation performed under global transaction " + gtx + ". Not broadcasting cleanups until the tx commits. Adding to tx mod list instead.");
 306    }
 307  6 transactionMods.put(gtx, cleanup);
 308  6 TransactionEntry te = getTransactionEntry(gtx);
 309  6 te.addModification(cleanup);
 310    }
 311    }
 312   
 313  35 private GravitateResult gravitateData(Fqn fqn) throws Exception
 314    {
 315  35 if (log.isTraceEnabled())
 316    {
 317  0 log.trace("cache=" + cache.getLocalAddress() + "; requesting data gravitation for Fqn " + fqn);
 318    }
 319  35 List<Address> mbrs = cache.getMembers();
 320  35 Boolean searchSubtrees = (buddyManager.isDataGravitationSearchBackupTrees() ? Boolean.TRUE : Boolean.FALSE);
 321  35 MethodCall dGrav = MethodCallFactory.create(MethodDeclarations.dataGravitationMethod, fqn, searchSubtrees);
 322    // doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could return null results from nodes that do
 323    // not have either the primary OR backup, and stop polling other valid nodes.
 324  35 List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav, GroupRequest.GET_ALL, true, buddyManager.getBuddyCommunicationTimeout());
 325  35 if (log.isTraceEnabled())
 326    {
 327  0 log.trace("got responses " + resps);
 328    }
 329  35 if (resps == null)
 330    {
 331  0 log.error("No replies to call " + dGrav + ". Perhaps we're alone in the cluster?");
 332  0 return GravitateResult.noDataFound();
 333    }
 334   
 335    // test for and remove exceptions
 336  35 GravitateResult result = GravitateResult.noDataFound();
 337  35 for (Object o : resps)
 338    {
 339  35 if (o instanceof Throwable)
 340    {
 341  0 if (log.isDebugEnabled())
 342    {
 343  0 log.debug("Found remote Throwable among responses - removing from responses list", (Exception) o);
 344    }
 345    }
 346  35 else if (o != null)
 347    {
 348  35 result = (GravitateResult) o;
 349  35 if (result.isDataFound())
 350    {
 351  19 break;
 352    }
 353    }
 354  0 else if (!configuration.isUseRegionBasedMarshalling())
 355    {
 356    // Null is OK if we are using region based marshalling; it
 357    // is what is returned if a region is inactive. Otherwise
 358    // getting a null is an error condition
 359  0 log.error("Unexpected null response to call " + dGrav + ".");
 360    }
 361   
 362    }
 363   
 364  35 return result;
 365    }
 366   
 367  34 private void createNode(List<NodeData> nodeData, boolean localOnly) throws CacheException
 368    {
 369  34 for (NodeData data : nodeData)
 370    {
 371  39 if (localOnly)
 372    {
 373  0 if (cache.peek(data.getFqn(), false) == null)
 374    {
 375  0 createNodesLocally(data.getFqn(), data.getAttributes());
 376    }
 377    }
 378    else
 379    {
 380  39 cache.put(data.getFqn(), data.getAttributes());
 381    }
 382    }
 383    }
 384   
 385  0 private void createNodesLocally(Fqn fqn, Map data) throws CacheException
 386    {
 387  0 int treeNodeSize;
 388  0 if ((treeNodeSize = fqn.size()) == 0) return;
 389  0 NodeSPI n = cache.getRoot();
 390  0 for (int i = 0; i < treeNodeSize; i++)
 391    {
 392  0 Object child_name = fqn.get(i);
 393  0 NodeSPI child_node = n.addChildDirect(new Fqn(child_name));
 394  0 if (child_node == null)
 395    {
 396  0 if (log.isTraceEnabled())
 397    {
 398  0 log.trace("failed to find or create child " + child_name + " of node " + n.getFqn());
 399    }
 400  0 return;
 401    }
 402  0 if (i == treeNodeSize - 1)
 403    {
 404    // set data
 405  0 child_node.putAllDirect(data);
 406    }
 407  0 n = child_node;
 408    }
 409    }
 410   
 411  6 private TransactionEntry getTransactionEntry(GlobalTransaction gtx)
 412    {
 413  6 return cache.getTransactionTable().get(gtx);
 414    }
 415   
 416  191 private Fqn extractFqn(int methodId, Object[] args)
 417    {
 418  191 return (Fqn) args[MethodDeclarations.isCrudMethod(methodId) ? 1 : 0];
 419    }
 420   
 421  50 private boolean localBackupExists(Fqn fqn)
 422    {
 423  50 boolean exists = false;
 424   
 425  50 for (Node node : getBackupRootCollection())
 426    {
 427  50 Fqn newSearchFqn = new Fqn(node.getFqn(), fqn);
 428  50 exists = cache.peek(newSearchFqn, false) != null;
 429  15 if (exists) break;
 430    }
 431   
 432  50 return exists;
 433    }
 434   
 435  15 private BackupData localBackupGet(Fqn fqn, InvocationContext ctx) throws CacheException
 436    {
 437  15 GravitateResult result = cache.gravitateData(fqn, true);// a "local" gravitation
 438  15 boolean found = result.isDataFound();
 439  15 BackupData data = null;
 440   
 441  15 if (found)
 442    {
 443  15 Fqn backupFqn = result.getBuddyBackupFqn();
 444  15 data = new BackupData(fqn, result);
 445    // now the cleanup
 446  15 if (buddyManager.isDataGravitationRemoveOnFind())
 447    {
 448    // Remove locally only; the remote call will
 449    // be broadcast later
 450  15 ctx.getOptionOverrides().setCacheModeLocal(true);
 451  15 cache.removeNode(backupFqn);
 452    }
 453    else
 454    {
 455  0 cache.evict(backupFqn, true);
 456    }
 457    }
 458   
 459  15 return data;
 460    }
 461   
 462  50 private Collection<Node> getBackupRootCollection()
 463    {
 464  50 NodeSPI backupRoot = cache.peek(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN, true);
 465  50 return backupRoot == null ? Collections.EMPTY_SET : backupRoot.getChildrenDirect();
 466    }
 467   
 468    private static class BackupData
 469    {
 470    Fqn primaryFqn;
 471    Fqn backupFqn;
 472    List<NodeData> backupData;
 473   
 474  34 public BackupData(Fqn primaryFqn, GravitateResult gr)
 475    {
 476  34 this.primaryFqn = primaryFqn;
 477  34 this.backupFqn = gr.getBuddyBackupFqn();
 478  34 this.backupData = gr.getNodeData();
 479    }
 480    }
 481   
 482   
 483    }