Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 310   Methods: 10
NCLOC: 200   Classes: 1
 
 Source file Conditionals Statements Methods TOTAL
StateTransferManager.java 63.3% 88% 100% 82.6%
coverage coverage
 1    /*
 2    * JBoss, the OpenSource J2EE webOS
 3    *
 4    * Distributable under LGPL license.
 5    * See terms of license at gnu.org.
 6    */
 7    package org.jboss.cache.statetransfer;
 8   
 9    import org.apache.commons.logging.Log;
 10    import org.apache.commons.logging.LogFactory;
 11    import org.jboss.cache.CacheException;
 12    import org.jboss.cache.CacheImpl;
 13    import org.jboss.cache.Fqn;
 14    import org.jboss.cache.NodeSPI;
 15    import org.jboss.cache.loader.CacheLoaderManager;
 16    import org.jboss.cache.lock.NodeLock;
 17    import org.jboss.cache.lock.TimeoutException;
 18    import org.jboss.cache.marshall.NodeData;
 19    import org.jboss.cache.marshall.NodeDataMarker;
 20   
 21    import java.io.ObjectInputStream;
 22    import java.io.ObjectOutputStream;
 23   
 24   
 25    public class StateTransferManager
 26    {
 27    protected final static Log log = LogFactory.getLog(StateTransferManager.class);
 28   
 29    public static final NodeData STREAMING_DELIMITER_NODE = new NodeDataMarker();
 30   
 31    public static final String PARTIAL_STATE_DELIMITER = "_PARTIAL_STATE_DELIMITER";
 32   
 33    private final CacheImpl cache;
 34   
 35  972 public StateTransferManager(CacheImpl cache)
 36    {
 37  972 this.cache = cache;
 38    }
 39   
 40  3708 public CacheImpl getTreeCache()
 41    {
 42  3708 return cache;
 43    }
 44   
 45    /**
 46    * Writes the state for the portion of the tree named by <code>fqn</code> to
 47    * the provided OutputStream.
 48    * <p/>
 49    * <p/>
 50    *
 51    * @param out stream to write state to
 52    * @param fqn Fqn indicating the uppermost node in the
 53    * portion of the tree whose state should be returned.
 54    * @param timeout max number of ms this method should wait to acquire
 55    * a read lock on the nodes being transferred
 56    * @param force if a read lock cannot be acquired after
 57    * <code>timeout</code> ms, should the lock acquisition
 58    * be forced, and any existing transactions holding locks
 59    * on the nodes be rolled back? <strong>NOTE:</strong>
 60    * In release 1.2.4, this parameter has no effect.
 61    * @param suppressErrors should any Throwable thrown be suppressed?
 62    * @throws Throwable in event of error
 63    */
 64  764 public void getState(ObjectOutputStream out, Fqn fqn, long timeout, boolean force, boolean suppressErrors) throws Throwable
 65    {
 66    // can't give state for regions currently being activated/inactivated
 67  764 boolean canProvideState = (!cache.getRegionManager().isInactive(fqn) && cache.findNode(fqn) != null);
 68   
 69  764 boolean fetchTransientState = cache.getConfiguration().isFetchInMemoryState();
 70  764 CacheLoaderManager cacheLoaderManager = cache.getCacheLoaderManager();
 71  764 boolean fetchPersistentState = cacheLoaderManager != null && cacheLoaderManager.isFetchPersistentState();
 72   
 73  764 if (canProvideState && (fetchPersistentState || fetchTransientState))
 74    {
 75  728 cache.getMarshaller().objectToObjectStream(true, out);
 76  728 StateTransferGenerator generator = getStateTransferGenerator();
 77  728 Object owner = getOwnerForLock();
 78  728 long startTime = System.currentTimeMillis();
 79  728 NodeSPI rootNode = cache.findNode(fqn);
 80   
 81  728 try
 82    {
 83  728 if (log.isDebugEnabled())
 84    {
 85  0 log.debug("locking the " + fqn + " subtree to return the in-memory (transient) state");
 86    }
 87  728 acquireLocksForStateTransfer(rootNode, owner, timeout, true, force);
 88  722 generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
 89  720 if (log.isDebugEnabled())
 90    {
 91  0 log.debug("Successfully generated state in " + (System.currentTimeMillis() - startTime) + " msec");
 92    }
 93    }
 94    finally
 95    {
 96  728 releaseStateTransferLocks(rootNode, owner, true);
 97    }
 98    }
 99    else
 100    {
 101  36 cache.getMarshaller().objectToObjectStream(false, out);
 102  36 Exception e = null;
 103  36 if (!canProvideState)
 104    {
 105  36 String exceptionMessage = "Cache instance at " + cache.getLocalAddress() + " cannot provide state for fqn " + fqn + ".";
 106   
 107  36 if (cache.getRegionManager().isInactive(fqn))
 108    {
 109  36 exceptionMessage += " Region for fqn " + fqn + " is inactive.";
 110    }
 111  36 if (cache.findNode(fqn) == null)
 112    {
 113  34 exceptionMessage += " There is no cache node at fqn " + fqn;
 114    }
 115  36 e = new CacheException(exceptionMessage);
 116    }
 117  36 if (!fetchPersistentState && !fetchTransientState)
 118    {
 119  0 e = new CacheException("Cache instance at " + cache.getLocalAddress() + " is not configured to provide state");
 120    }
 121  36 cache.getMarshaller().objectToObjectStream(e, out);
 122  36 throw e;
 123    }
 124    }
 125   
 126    /**
 127    * Set the portion of the cache rooted in <code>targetRoot</code>
 128    * to match the given state. Updates the contents of <code>targetRoot</code>
 129    * to reflect those in <code>new_state</code>.
 130    * <p/>
 131    * <strong>NOTE:</strong> This method performs no locking of nodes; it
 132    * is up to the caller to lock <code>targetRoot</code> before calling
 133    * this method.
 134    * <p/>
 135    * This method will use any {@link ClassLoader} needed as defined by the active {@link org.jboss.cache.Region}
 136    * in the {@link org.jboss.cache.RegionManager}, pertaining to the targetRoot passed in.
 137    *
 138    * @param in an input stream containing the state
 139    * @param targetRoot fqn of the node into which the state should be integrated
 140    * @throws Exception In event of error
 141    */
 142  777 public void setState(ObjectInputStream in, Fqn targetRoot) throws Exception
 143    {
 144  777 CacheImpl cache = getTreeCache();
 145  777 NodeSPI target = cache.findNode(targetRoot);
 146  777 if (target == null)
 147    {
 148    // Create the integration root, but do not replicate
 149  221 cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
 150  221 cache.put(targetRoot, null);
 151  221 target = cache.findNode(targetRoot);
 152    }
 153  777 Object o = cache.getMarshaller().objectFromObjectStream(in);
 154  777 Boolean hasState = (Boolean) o;
 155  777 if (hasState)
 156    {
 157  741 setState(in, target);
 158    }
 159    else
 160    {
 161  36 throw new CacheException("Cache instance at " + cache.getLocalAddress()
 162    + " cannot integrate state since state provider could not provide state due to " + cache.getMarshaller().objectFromObjectStream(in));
 163    }
 164    }
 165   
 166    /**
 167    * Set the portion of the cache rooted in <code>targetRoot</code>
 168    * to match the given state. Updates the contents of <code>targetRoot</code>
 169    * to reflect those in <code>new_state</code>.
 170    * <p/>
 171    * <strong>NOTE:</strong> This method performs no locking of nodes; it
 172    * is up to the caller to lock <code>targetRoot</code> before calling
 173    * this method.
 174    *
 175    * @param state a serialized byte[][] array where element 0 is the
 176    * transient state (or null) , and element 1 is the
 177    * persistent state (or null)
 178    * @param targetRoot node into which the state should be integrated
 179    */
 180  741 private void setState(ObjectInputStream state, NodeSPI targetRoot) throws Exception
 181    {
 182  741 Object owner = getOwnerForLock();
 183  741 long timeout = cache.getConfiguration().getStateRetrievalTimeout();
 184  741 long startTime = System.currentTimeMillis();
 185   
 186  741 try
 187    {
 188    // Acquire a lock on the root node
 189  741 acquireLocksForStateTransfer(targetRoot, owner, timeout, true, true);
 190   
 191    /*
 192    * Vladimir/Manik/Brian (Dec 7,2006)
 193    *
 194    * integrator.integrateState(in,targetRoot, cl) will call cache.put for each
 195    * node read from stream. Having option override below allows nodes read
 196    * to be directly stored into a tree since we bypass interceptor chain.
 197    *
 198    */
 199   
 200    // Option option = new Option();
 201    // option.setBypassInterceptorChain(true);
 202    // cache.getInvocationContext().setOptionOverrides(option);
 203    //
 204  734 StateTransferIntegrator integrator = getStateTransferIntegrator(state, targetRoot.getFqn());
 205  730 if (log.isDebugEnabled())
 206    {
 207  0 log.debug("starting state integration at node " + targetRoot);
 208    }
 209  730 integrator.integrateState(state, targetRoot);
 210  728 if (log.isDebugEnabled())
 211    {
 212  0 log.debug("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");
 213    }
 214    }
 215    finally
 216    {
 217  741 releaseStateTransferLocks(targetRoot, owner, true);
 218    }
 219    }
 220   
 221   
 222    /**
 223    * Acquires locks on a root node for an owner for state transfer.
 224    */
 225  1469 protected void acquireLocksForStateTransfer(NodeSPI root,
 226    Object lockOwner,
 227    long timeout,
 228    boolean lockChildren,
 229    boolean force)
 230    throws Exception
 231    {
 232  1469 try
 233    {
 234  1469 if (lockChildren)
 235    {
 236  1469 root.getLock().acquireAll(lockOwner, timeout, NodeLock.LockType.READ);
 237    }
 238    else
 239    {
 240  0 root.getLock().acquire(lockOwner, timeout, NodeLock.LockType.READ);
 241    }
 242    }
 243    catch (TimeoutException te)
 244    {
 245  13 log.error("Caught TimeoutException acquiring locks on region " +
 246    root.getFqn(), te);
 247  13 if (force)
 248    {
 249    // Until we have FLUSH in place, don't force locks
 250    // forceAcquireLock(root, lockOwner, lockChildren);
 251  13 throw te;
 252   
 253    }
 254    else
 255    {
 256  0 throw te;
 257    }
 258    }
 259    }
 260   
 261    /**
 262    * Releases all state transfer locks acquired.
 263    *
 264    * @see #acquireLocksForStateTransfer
 265    */
 266  1469 protected void releaseStateTransferLocks(NodeSPI root,
 267    Object lockOwner,
 268    boolean childrenLocked)
 269    {
 270  1469 try
 271    {
 272  1469 if (childrenLocked)
 273    {
 274  1469 root.getLock().releaseAll(lockOwner);
 275    }
 276    else
 277    {
 278  0 root.getLock().release(lockOwner);
 279    }
 280    }
 281    catch (Throwable t)
 282    {
 283  0 log.error("failed releasing locks", t);
 284    }
 285    }
 286   
 287  728 protected StateTransferGenerator getStateTransferGenerator()
 288    {
 289  728 return StateTransferFactory.getStateTransferGenerator(getTreeCache());
 290    }
 291   
 292  734 protected StateTransferIntegrator getStateTransferIntegrator(ObjectInputStream istream, Fqn fqn) throws Exception
 293    {
 294  734 return StateTransferFactory.getStateTransferIntegrator(istream, fqn, getTreeCache());
 295    }
 296   
 297    /**
 298    * Returns an object suitable for use in node locking, either the current
 299    * transaction or the current thread if there is no transaction.
 300    */
 301  1469 private Object getOwnerForLock()
 302    {
 303  1469 Object owner = getTreeCache().getCurrentTransaction();
 304  1469 if (owner == null)
 305    {
 306  1469 owner = Thread.currentThread();
 307    }
 308  1469 return owner;
 309    }
 310    }