Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 431   Methods: 17
NCLOC: 310   Classes: 1
 
 Source file Conditionals Statements Methods TOTAL
DefaultStateTransferIntegrator.java 57.6% 69.7% 70.6% 66%
coverage coverage
 1    /*
 2    * JBoss, the OpenSource J2EE webOS
 3    *
 4    * Distributable under LGPL license.
 5    * See terms of license at gnu.org.
 6    */
 7    package org.jboss.cache.statetransfer;
 8   
 9    import org.apache.commons.logging.Log;
 10    import org.apache.commons.logging.LogFactory;
 11    import org.jboss.cache.CacheException;
 12    import org.jboss.cache.CacheImpl;
 13    import org.jboss.cache.Fqn;
 14    import org.jboss.cache.InvocationContext;
 15    import org.jboss.cache.Node;
 16    import org.jboss.cache.NodeFactory;
 17    import org.jboss.cache.NodeSPI;
 18    import org.jboss.cache.Region;
 19    import org.jboss.cache.buddyreplication.BuddyManager;
 20    import org.jboss.cache.eviction.EvictedEventNode;
 21    import org.jboss.cache.eviction.NodeEventType;
 22    import org.jboss.cache.loader.CacheLoader;
 23    import org.jboss.cache.marshall.NodeData;
 24    import org.jboss.cache.marshall.NodeDataExceptionMarker;
 25    import org.jboss.cache.marshall.NodeDataMarker;
 26   
 27    import java.io.IOException;
 28    import java.io.ObjectInputStream;
 29    import java.util.HashSet;
 30    import java.util.Iterator;
 31    import java.util.List;
 32    import java.util.Map;
 33    import java.util.Set;
 34   
 35    public class DefaultStateTransferIntegrator implements StateTransferIntegrator
 36    {
 37   
 38    protected Log log = LogFactory.getLog(getClass().getName());
 39   
 40    private CacheImpl cache;
 41   
 42    private Fqn targetFqn;
 43   
 44    private NodeFactory factory;
 45   
 46    private NodeFactory.NodeType nodeType;
 47   
 48    private Set<Fqn> internalFqns;
 49   
 50  730 public DefaultStateTransferIntegrator(Fqn targetFqn, CacheImpl cache)
 51    {
 52  730 this.targetFqn = targetFqn;
 53  730 this.cache = cache;
 54  730 this.factory = cache.getConfiguration().getRuntimeConfig().getNodeFactory();
 55  730 this.nodeType = cache.getConfiguration().isNodeLockingOptimistic()
 56    ? NodeFactory.NodeType.VERSIONED_NODE
 57    : NodeFactory.NodeType.UNVERSIONED_NODE;
 58  730 this.internalFqns = cache.getInternalFqns();
 59    }
 60   
 61  730 public void integrateState(ObjectInputStream ois, Node target) throws Exception
 62    {
 63  730 integrateTransientState(ois, (NodeSPI) target);
 64  730 integrateAssociatedState(ois);
 65  730 integratePersistentState(ois);
 66    }
 67   
 68  730 protected void integrateTransientState(ObjectInputStream in, NodeSPI target) throws Exception
 69    {
 70  730 boolean transientSet = false;
 71    // ClassLoader oldCL = setClassLoader(cl);
 72  730 try
 73    {
 74  730 if (log.isTraceEnabled())
 75    {
 76  0 log.trace("integrating transient state for " + target);
 77    }
 78   
 79  730 integrateTransientState(target, in);
 80   
 81  730 transientSet = true;
 82   
 83  730 if (log.isTraceEnabled())
 84    {
 85  0 log.trace("transient state successfully integrated");
 86    }
 87   
 88  730 notifyAllNodesCreated(cache.getInvocationContext(), target);
 89    }
 90    catch (Exception e)
 91    {
 92  0 if (log.isDebugEnabled()) log.debug("Caught unexpected exception", e);
 93    }
 94    finally
 95    {
 96  730 if (!transientSet)
 97    {
 98    // Clear any existing state from the targetRoot
 99  0 log.warn("transient state integration failed, removing all children of " + target);
 100  0 target.clearData();
 101  0 target.removeChildrenDirect();
 102    }
 103   
 104    // resetClassLoader(oldCL);
 105    }
 106    }
 107   
 108    /**
 109    * Provided for subclasses that deal with associated state.
 110    *
 111    * @throws Exception
 112    */
 113  730 protected void integrateAssociatedState(ObjectInputStream in) throws Exception
 114    {
 115    // no-op in this base class
 116    // just read marker
 117  730 cache.getMarshaller().objectFromObjectStream(in);
 118    }
 119   
 120  730 protected void integratePersistentState(ObjectInputStream in) throws Exception
 121    {
 122   
 123  730 CacheLoader loader = cache.getCacheLoader();
 124  730 if (loader == null)
 125    {
 126  621 if (log.isTraceEnabled())
 127    {
 128  0 log.trace("cache loader is null, will not attempt to integrate persistent state");
 129    }
 130    }
 131    else
 132    {
 133  109 if (log.isTraceEnabled())
 134    {
 135  0 log.trace("integrating persistent state using " + loader.getClass().getName());
 136    }
 137   
 138  109 boolean persistentSet = false;
 139  109 try
 140    {
 141  109 if (targetFqn.isRoot())
 142    {
 143  58 loader.storeEntireState(in);
 144    }
 145    else
 146    {
 147  51 loader.storeState(targetFqn, in);
 148    }
 149  107 persistentSet = true;
 150    }
 151    catch (ClassCastException cce)
 152    {
 153  2 log.error("Failed integrating persistent state. One of cacheloaders is not"
 154    + " adhering to state stream format. See JBCACHE-738.");
 155  2 throw cce;
 156    }
 157    finally
 158    {
 159  109 if (!persistentSet)
 160    {
 161  2 log.warn("persistent state integration failed, removing all nodes from loader");
 162  2 loader.remove(targetFqn);
 163    }
 164    else
 165    {
 166  107 if (log.isTraceEnabled())
 167    {
 168  0 log.trace("persistent state integrated successfully");
 169    }
 170    }
 171    }
 172    }
 173    }
 174   
 175  22276 protected CacheImpl getCache()
 176    {
 177  22276 return cache;
 178    }
 179   
 180  0 protected NodeFactory getFactory()
 181    {
 182  0 return factory;
 183    }
 184   
 185  0 protected NodeFactory.NodeType getNodeType()
 186    {
 187  0 return nodeType;
 188    }
 189   
 190  0 protected Fqn getTargetFqn()
 191    {
 192  0 return targetFqn;
 193    }
 194   
 195    /**
 196    * Generates NodeAdded notifications for all nodes of the tree. This is
 197    * called whenever the tree is initially retrieved (state transfer)
 198    */
 199  11138 private void notifyAllNodesCreated(InvocationContext ctx, NodeSPI curr)
 200    {
 201  0 if (curr == null) return;
 202  11138 getCache().getNotifier().notifyNodeCreated(curr.getFqn(), true, ctx);
 203  11138 getCache().getNotifier().notifyNodeCreated(curr.getFqn(), false, ctx);
 204  11138 Set<NodeSPI> children = curr.getChildrenDirect();
 205  11138 for (NodeSPI n : children)
 206    {
 207  10408 notifyAllNodesCreated(ctx, n);
 208    }
 209    }
 210   
 211    /*
 212    private ClassLoader setClassLoader(ClassLoader newLoader)
 213    {
 214    ClassLoader oldClassLoader = null;
 215    if (newLoader != null)
 216    {
 217    oldClassLoader = Thread.currentThread().getContextClassLoader();
 218    Thread.currentThread().setContextClassLoader(newLoader);
 219    }
 220    return oldClassLoader;
 221    }
 222   
 223    private void resetClassLoader(ClassLoader oldLoader)
 224    {
 225    if (oldLoader != null)
 226    {
 227    Thread.currentThread().setContextClassLoader(oldLoader);
 228    }
 229    }
 230    */
 231   
 232  730 private void integrateTransientState(NodeSPI target, ObjectInputStream in) throws Exception
 233    {
 234  730 Set<Node> retainedNodes = retainInternalNodes(target);
 235   
 236  730 target.removeChildrenDirect();
 237   
 238  730 List<NodeData> list = readNodesAsList(in);
 239  730 if (list != null)
 240    {
 241    // if the list was null we read an EOF marker!! So don't bother popping it off the stack later.
 242  726 Iterator<NodeData> nodeDataIterator = list.iterator();
 243   
 244    // Read the first NodeData and integrate into our target
 245  726 if (nodeDataIterator.hasNext())
 246    {
 247  726 NodeData nd = nodeDataIterator.next();
 248   
 249    //are there any transient nodes at all?
 250  726 if (nd != null && !nd.isMarker())
 251    {
 252  726 target.putAllDirect(nd.getAttributes());
 253   
 254    // Check whether this is an integration into the buddy backup
 255    // subtree
 256  726 Fqn tferFqn = nd.getFqn();
 257  726 Fqn tgtFqn = target.getFqn();
 258  726 boolean move = tgtFqn.isChildOrEquals(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN)
 259    && !tferFqn.isChildOrEquals(tgtFqn);
 260    // If it is an integration, calculate how many levels of offset
 261  726 int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
 262   
 263  726 integrateStateTransferChildren(target, offset, nodeDataIterator);
 264   
 265  726 integrateRetainedNodes(target, retainedNodes);
 266    }
 267    }
 268   
 269    // read marker off stack
 270  726 cache.getMarshaller().objectFromObjectStream(in);
 271    }
 272    }
 273   
 274  730 private List<NodeData> readNodesAsList(ObjectInputStream in) throws Exception
 275    {
 276  730 Object obj = cache.getMarshaller().objectFromObjectStream(in);
 277  4 if (obj instanceof NodeDataMarker) return null;
 278   
 279  726 List list = (List) obj;
 280  726 return list;
 281    }
 282   
 283    // private NodeData readNode(ObjectInputStream in) throws IOException, ClassNotFoundException
 284    // {
 285    // NodeData nd = (NodeData) in.readObject();
 286    // if (nd != null && nd.isExceptionMarker())
 287    // {
 288    // NodeDataExceptionMarker ndem = (NodeDataExceptionMarker) nd;
 289    // throw new CacheException("State provider node " + ndem.getCacheNodeIdentity()
 290    // + " threw exception during loadState", ndem.getCause());
 291    // }
 292    // return nd;
 293    // }
 294   
 295  11134 private NodeData integrateStateTransferChildren(NodeSPI parent, int offset, Iterator<NodeData> nodeDataIterator)
 296    throws IOException, ClassNotFoundException
 297    {
 298  11134 int parent_level = parent.getFqn().size();
 299  11134 int target_level = parent_level + 1;
 300  11134 Fqn fqn;
 301  11134 int size;
 302  11134 Object name;
 303  11134 NodeData nd = nodeDataIterator.hasNext() ? nodeDataIterator.next() : null;
 304  11134 while (nd != null && !nd.isMarker())
 305    {
 306  20626 fqn = nd.getFqn();
 307    // If we need to integrate into the buddy backup subtree,
 308    // change the Fqn to fit under it
 309  20626 if (offset > 0)
 310    {
 311  24 fqn = new Fqn(parent.getFqn().getAncestor(offset), fqn);
 312    }
 313  20626 size = fqn.size();
 314  20626 if (size <= parent_level)
 315    {
 316  10218 return nd;
 317    }
 318  10408 else if (size > target_level)
 319    {
 320  0 throw new IllegalStateException("NodeData " + fqn + " is not a direct child of " + parent.getFqn());
 321    }
 322   
 323  10408 name = fqn.get(size - 1);
 324   
 325  10408 Map attrs = nd.getAttributes();
 326   
 327    // We handle this NodeData. Create a TreeNode and
 328    // integrate its data
 329  10408 NodeSPI target = factory.createDataNode(name, fqn, parent, attrs, false);
 330  10408 parent.addChild(name, target);
 331   
 332    // JBCACHE-913
 333  10408 Region region = cache.getRegion(fqn, false);
 334  10408 if (region != null && region.getEvictionPolicy() != null)
 335    {
 336  10023 region.putNodeEvent(new EvictedEventNode(fqn, NodeEventType.ADD_NODE_EVENT,
 337  10023 attrs == null ? 0 : attrs.size()));
 338    }
 339   
 340    // Recursively call, which will walk down the tree
 341    // and return the next NodeData that's a child of our parent
 342  10408 nd = integrateStateTransferChildren(target, offset, nodeDataIterator);
 343    }
 344  916 if (nd != null && nd.isExceptionMarker())
 345    {
 346  0 NodeDataExceptionMarker ndem = (NodeDataExceptionMarker) nd;
 347  0 throw new CacheException("State provider node " + ndem.getCacheNodeIdentity()
 348    + " threw exception during loadState", ndem.getCause());
 349    }
 350  916 return null;
 351    }
 352   
 353  730 private Set<Node> retainInternalNodes(Node target)
 354    {
 355  730 Set<Node> result = new HashSet<Node>();
 356  730 Fqn targetFqn = target.getFqn();
 357  730 for (Fqn internalFqn : internalFqns)
 358    {
 359  240 if (internalFqn.isChildOf(targetFqn))
 360    {
 361  0 Node internalNode = getInternalNode(target, internalFqn);
 362  0 if (internalNode != null)
 363    {
 364  0 result.add(internalNode);
 365    }
 366    }
 367    }
 368   
 369  730 return result;
 370    }
 371   
 372  0 private Node getInternalNode(Node parent, Fqn internalFqn)
 373    {
 374  0 Object name = internalFqn.get(parent.getFqn().size());
 375  0 Node result = parent.getChild(new Fqn(name));
 376  0 if (result != null)
 377    {
 378  0 if (internalFqn.size() < result.getFqn().size())
 379    {
 380    // need to recursively walk down the tree
 381  0 result = getInternalNode(result, internalFqn);
 382    }
 383    }
 384  0 return result;
 385    }
 386   
 387  726 private void integrateRetainedNodes(NodeSPI root, Set<Node> retainedNodes)
 388    {
 389  726 Fqn rootFqn = root.getFqn();
 390  726 for (Node retained : retainedNodes)
 391    {
 392  0 if (retained.getFqn().isChildOf(rootFqn))
 393    {
 394  0 integrateRetainedNode(root, retained);
 395    }
 396    }
 397    }
 398   
 399  0 private void integrateRetainedNode(NodeSPI ancestor, Node descendant)
 400    {
 401  0 Fqn descFqn = descendant.getFqn();
 402  0 Fqn ancFqn = ancestor.getFqn();
 403  0 Object name = descFqn.get(ancFqn.size());
 404  0 NodeSPI child = (NodeSPI) ancestor.getChild(new Fqn(name));
 405  0 if (ancFqn.size() == descFqn.size() + 1)
 406    {
 407  0 if (child == null)
 408    {
 409  0 ancestor.addChild(name, descendant);
 410    }
 411    else
 412    {
 413  0 log.warn("Received unexpected internal node " + descFqn + " in transferred state");
 414    }
 415    }
 416    else
 417    {
 418  0 if (child == null)
 419    {
 420    // Missing level -- have to create empty node
 421    // This shouldn't really happen -- internal fqns should
 422    // be immediately under the root
 423  0 child = factory.createDataNode(name, new Fqn(ancFqn, name), ancestor, null, true);
 424  0 ancestor.addChild(name, child);
 425    }
 426   
 427    // Keep walking down the tree
 428  0 integrateRetainedNode(child, descendant);
 429    }
 430    }
 431    }