Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 1,116   Methods: 33
NCLOC: 735   Classes: 5
 
 Source file Conditionals Statements Methods TOTAL
VersionedTestBase.java 51% 81.2% 75.8% 75.4%
coverage coverage
 1    /*
 2    * JBoss, the OpenSource J2EE webOS
 3    *
 4    * Distributable under LGPL license.
 5    * See terms of license at gnu.org.
 6    */
 7   
 8    package org.jboss.cache.statetransfer;
 9   
 10    import org.jboss.cache.Cache;
 11    import org.jboss.cache.CacheException;
 12    import org.jboss.cache.CacheSPI;
 13    import org.jboss.cache.DefaultCacheFactory;
 14    import org.jboss.cache.Fqn;
 15    import org.jboss.cache.Node;
 16    import org.jboss.cache.Region;
 17    import org.jboss.cache.config.Configuration;
 18    import org.jboss.cache.config.Configuration.CacheMode;
 19    import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
 20    import org.jboss.cache.loader.CacheLoader;
 21    import org.jboss.cache.marshall.InactiveRegionException;
 22    import org.jboss.cache.misc.TestingUtil;
 23   
 24    import java.lang.reflect.Method;
 25    import java.util.Random;
 26    import java.util.Set;
 27    import java.util.concurrent.Semaphore;
 28    import java.util.concurrent.TimeUnit;
 29   
 30    /**
 31    * Abstract superclass of "StateTransferVersion"-specific tests
 32    * of CacheSPI's state transfer capability.
 33    * <p/>
 34    * TODO add tests with classloader regions
 35    *
 36    * @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
 37    * @version $Id$
 38    */
 39    public abstract class VersionedTestBase extends StateTransferTestBase
 40    {
 41    private static final int SUBTREE_SIZE = 10;
 42   
 43    public static final Fqn A = Fqn.fromString("/a");
 44    public static final Fqn B = Fqn.fromString("/b");
 45    public static final Fqn C = Fqn.fromString("/c");
 46   
 47    protected static final String ADDRESS_CLASSNAME = "org.jboss.cache.marshall.data.Address";
 48    protected static final String PERSON_CLASSNAME = "org.jboss.cache.marshall.data.Person";
 49   
 50    public static final Fqn A_B = Fqn.fromString("/a/b");
 51    public static final Fqn A_C = Fqn.fromString("/a/c");
 52    public static final Fqn A_D = Fqn.fromString("/a/d");
 53   
 54  2 public void testInitialStateTransfer() throws Exception
 55    {
 56  2 CacheSPI cache1 = createCache("cache1", false, false, false);
 57   
 58  2 cache1.put(A_B, "name", JOE);
 59  2 cache1.put(A_B, "age", TWENTY);
 60  2 cache1.put(A_C, "name", BOB);
 61  2 cache1.put(A_C, "age", FORTY);
 62   
 63  2 CacheSPI cache2 = createCache("cache2", false, false, false);
 64   
 65    // Pause to give caches time to see each other
 66  2 TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
 67   
 68  2 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
 69  2 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
 70  2 assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
 71  2 assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
 72    }
 73   
 74  2 public void testInitialStateTferWithLoader() throws Exception
 75    {
 76  2 initialStateTferWithLoaderTest(false);
 77    }
 78   
 79  2 public void testInitialStateTferWithAsyncLoader() throws Exception
 80    {
 81  2 initialStateTferWithLoaderTest(true);
 82    }
 83   
 84  4 protected void initialStateTferWithLoaderTest(boolean asyncLoader) throws Exception
 85    {
 86  4 initialStateTferWithLoaderTest("org.jboss.cache.loader.FileCacheLoader",
 87    "org.jboss.cache.loader.FileCacheLoader", asyncLoader);
 88    }
 89   
 90  2 public void testPartialStateTransfer() throws Exception
 91    {
 92  2 CacheSPI cache1 = createCache("cache1", false, true, false);
 93   
 94  2 createAndActivateRegion(cache1, A);
 95   
 96  2 cache1.put(A_B, "name", JOE);
 97  2 cache1.put(A_B, "age", TWENTY);
 98  2 cache1.put(A_C, "name", BOB);
 99  2 cache1.put(A_C, "age", FORTY);
 100   
 101  2 CacheSPI cache2 = createCache("cache2", false, true, false);
 102   
 103    // Pause to give caches time to see each other
 104  2 TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
 105   
 106  2 assertNull("/a/b name transferred against policy", cache2.get(A_B, "name"));
 107  2 assertNull("/a/b age transferred against policy", cache2.get(A_B, "age"));
 108  2 assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
 109  2 assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
 110   
 111  2 createAndActivateRegion(cache2, A_B);
 112   
 113  2 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
 114  2 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
 115  2 assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
 116  2 assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
 117   
 118  2 cache1.put(A_D, "name", JANE);
 119   
 120  2 assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
 121   
 122  2 createAndActivateRegion(cache2, A_C);
 123   
 124  2 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
 125  2 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
 126  2 assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
 127  2 assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
 128  2 assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
 129   
 130  2 createAndActivateRegion(cache2, A_D);
 131   
 132  2 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
 133  2 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
 134  2 assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
 135  2 assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
 136  2 assertEquals("Incorrect name for /a/d", JANE, cache2.get(A_D, "name"));
 137   
 138   
 139  2 cache1.getRegion(A, false).deactivate();
 140  2 createAndActivateRegion(cache1, A_B);
 141  2 createAndActivateRegion(cache1, A_C);
 142  2 createAndActivateRegion(cache1, A_D);
 143   
 144  2 assertEquals("Incorrect name for /a/b", JOE, cache1.get(A_B, "name"));
 145  2 assertEquals("Incorrect age for /a/b", TWENTY, cache1.get(A_B, "age"));
 146  2 assertEquals("Incorrect name for /a/c", BOB, cache1.get(A_C, "name"));
 147  2 assertEquals("Incorrect age for /a/c", FORTY, cache1.get(A_C, "age"));
 148  2 assertEquals("Incorrect name for /a/d", JANE, cache1.get(A_D, "name"));
 149   
 150    }
 151   
 152  172 private void createAndActivateRegion(CacheSPI c, Fqn f)
 153    {
 154  172 Region r = c.getRegion(f, true);
 155  172 r.registerContextClassLoader(getClass().getClassLoader());
 156  172 r.activate();
 157    }
 158   
 159  2 public void testPartialStateTferWithLoader() throws Exception
 160    {
 161  2 CacheSPI cache1 = createCache("cache1", false, true, true);
 162   
 163  2 createAndActivateRegion(cache1, A);
 164   
 165  2 cache1.put(A_B, "name", JOE);
 166  2 cache1.put(A_B, "age", TWENTY);
 167  2 cache1.put(A_C, "name", BOB);
 168  2 cache1.put(A_C, "age", FORTY);
 169   
 170  2 CacheSPI cache2 = createCache("cache2", false, true, true);
 171   
 172    // Pause to give caches time to see each other
 173  2 TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
 174   
 175  2 CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();
 176   
 177  2 assertNull("/a/b transferred to loader against policy", loader.get(A_B));
 178   
 179  2 assertNull("/a/b name transferred against policy", cache2.get(A_B, "name"));
 180  2 assertNull("/a/b age transferred against policy", cache2.get(A_B, "age"));
 181  2 assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
 182  2 assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
 183   
 184  2 createAndActivateRegion(cache2, A_B);
 185   
 186  2 assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
 187  2 assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
 188  2 assertNull("/a/c transferred to loader against policy", loader.get(A_C));
 189   
 190  2 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
 191  2 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
 192  2 assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
 193  2 assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
 194   
 195  2 cache1.put(A_D, "name", JANE);
 196   
 197  2 assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
 198   
 199  2 createAndActivateRegion(cache2, A_C);
 200   
 201  2 assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
 202  2 assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
 203  2 assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
 204  2 assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
 205   
 206  2 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
 207  2 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
 208  2 assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
 209  2 assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
 210  2 assertNull("/a/d name transferred against policy", cache2.get(A_D, "name"));
 211   
 212  2 createAndActivateRegion(cache2, A_D);
 213   
 214  2 assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
 215  2 assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
 216  2 assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
 217  2 assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
 218  2 assertEquals("Incorrect name from loader for /a/d", JANE, loader.get(A_D).get("name"));
 219   
 220  2 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
 221  2 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
 222  2 assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
 223  2 assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
 224  2 assertEquals("Incorrect name for /a/d", JANE, cache2.get(A_D, "name"));
 225   
 226  2 cache1.getRegion(A, false).deactivate();
 227   
 228  2 createAndActivateRegion(cache1, A_B);
 229  2 createAndActivateRegion(cache1, A_C);
 230  2 createAndActivateRegion(cache1, A_D);
 231   
 232  2 loader = cache1.getCacheLoaderManager().getCacheLoader();
 233   
 234  2 assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
 235  2 assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
 236  2 assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
 237  2 assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
 238  2 assertEquals("Incorrect name from loader for /a/d", JANE, loader.get(A_D).get("name"));
 239   
 240  2 assertEquals("Incorrect name for /a/b", JOE, cache1.get(A_B, "name"));
 241  2 assertEquals("Incorrect age for /a/b", TWENTY, cache1.get(A_B, "age"));
 242  2 assertEquals("Incorrect name for /a/c", BOB, cache1.get(A_C, "name"));
 243  2 assertEquals("Incorrect age for /a/c", FORTY, cache1.get(A_C, "age"));
 244  2 assertEquals("Incorrect name for /a/d", JANE, cache1.get(A_D, "name"));
 245    }
 246   
 247  2 public void testPartialStateTferWithClassLoader() throws Exception
 248    {
 249    // FIXME: This test is meaningless because MarshalledValueInputStream
 250    // will find the classes w/ their own loader if TCL can't. Need
 251    // to find a way to test!
 252    // But, at least it tests JBCACHE-305 by registering a classloader
 253    // both before and after start()
 254   
 255    // Set the TCL to a classloader that can't see Person/Address
 256  2 Thread.currentThread().setContextClassLoader(getNotFoundClassLoader());
 257   
 258  2 CacheSPI cache1 = createCache("cache1",
 259    false, // async
 260    true, // use marshaller
 261    true, // use cacheloader
 262    false, false);// don't start
 263  2 ClassLoader cl1 = getClassLoader();
 264  2 cache1.getRegion(A, true).registerContextClassLoader(cl1);
 265  2 startCache(cache1);
 266   
 267  2 cache1.getRegion(A, true).activate();
 268   
 269  2 Object ben = createBen(cl1);
 270   
 271  2 cache1.put(A_B, "person", ben);
 272   
 273    // For cache 2 we won't register loader until later
 274  2 CacheSPI cache2 = createCache("cache2",
 275    false, // async
 276    true, // use marshalling
 277    true, // use cacheloader
 278    false, true);// start
 279   
 280    // Pause to give caches time to see each other
 281  2 TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
 282   
 283  2 CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();
 284   
 285  2 assertNull("/a/b not transferred to loader", loader.get(A_B));
 286   
 287  2 assertNull("/a/b not transferred to cache", cache2.get(A_B, "person"));
 288   
 289  2 ClassLoader cl2 = getClassLoader();
 290   
 291    // cache2.registerClassLoader(A, cl2);
 292  2 Region r = cache2.getRegion(A, true);
 293  2 r.registerContextClassLoader(cl2);
 294   
 295  2 r.activate();
 296   
 297  2 assertEquals("Correct state from loader for /a/b", ben.toString(), loader.get(A_B).get("person").toString());
 298   
 299  2 assertEquals("Correct state from cache for /a/b", ben.toString(), cache2.get(A_B, "person").toString());
 300   
 301    }
 302   
 303  2 public void testLoadEntireStateAfterStart() throws Exception
 304    {
 305  2 CacheSPI cache1 = createCache("cache1", false, true, true);
 306   
 307  2 createAndActivateRegion(cache1, Fqn.ROOT);
 308   
 309  2 cache1.put(A_B, "name", JOE);
 310  2 cache1.put(A_B, "age", TWENTY);
 311  2 cache1.put(A_C, "name", BOB);
 312  2 cache1.put(A_C, "age", FORTY);
 313   
 314  2 CacheSPI cache2 = createCache("cache2", false, true, true);
 315   
 316    // Pause to give caches time to see each other
 317  2 TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
 318   
 319  2 CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();
 320   
 321  2 assertNull("/a/b transferred to loader against policy", loader.get(A_B));
 322   
 323  2 assertNull("/a/b name transferred against policy", cache2.get(A_B, "name"));
 324  2 assertNull("/a/b age transferred against policy", cache2.get(A_B, "age"));
 325  2 assertNull("/a/c name transferred against policy", cache2.get(A_C, "name"));
 326  2 assertNull("/a/c age transferred against policy", cache2.get(A_C, "age"));
 327   
 328  2 createAndActivateRegion(cache2, Fqn.ROOT);
 329   
 330  2 assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
 331  2 assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
 332  2 assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
 333  2 assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
 334   
 335  2 assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
 336  2 assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
 337  2 assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
 338  2 assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
 339    }
 340   
 341    /**
 342    * Tests concurrent activation of the same subtree by multiple nodes in a
 343    * REPL_SYNC environment. The idea is to see what would happen with a
 344    * farmed deployment. See <code>concurrentActivationTest</code> for details.
 345    *
 346    * @throws Exception
 347    */
 348  2 public void testConcurrentActivationSync() throws Exception
 349    {
 350  2 concurrentActivationTest(true);
 351    }
 352   
 353    /**
 354    * Tests concurrent activation of the same subtree by multiple nodes in a
 355    * REPL_ASYNC environment. The idea is to see what would happen with a
 356    * farmed deployment. See <code>concurrentActivationTest</code> for details.
 357    *
 358    * @throws Exception
 359    */
 360  2 public void testConcurrentActivationAsync() throws Exception
 361    {
 362  2 concurrentActivationTest(false);
 363    }
 364   
 365    /**
 366    * Starts 5 caches and then concurrently activates the same region under
 367    * all 5, causing each to attempt a partial state transfer from the others.
 368    * As soon as each cache has activated its region, it does a put to a node
 369    * in the region, thus complicating the lives of the other caches trying
 370    * to get partial state.
 371    * <p/>
 372    * Failure condition is if any node sees an exception or if the final state
 373    * of all caches is not consistent.
 374    *
 375    * @param sync use REPL_SYNC or REPL_ASYNC
 376    * @throws Exception
 377    */
 378  4 private void concurrentActivationTest(boolean sync) throws Exception
 379    {
 380  4 String[] names = {"A", "B", "C", "D", "E"};
 381  4 int count = names.length;
 382  4 CacheActivator[] activators = new CacheActivator[count];
 383   
 384   
 385  4 try
 386    {
 387    // Create a semaphore and take all its tickets
 388  4 Semaphore semaphore = new Semaphore(count);
 389  4 for (int i = 0; i < count; i++)
 390    {
 391  20 semaphore.acquire();
 392    }
 393   
 394    // Create activation threads that will block on the semaphore
 395  4 CacheSPI[] caches = new CacheSPI[count];
 396  4 for (int i = 0; i < count; i++)
 397    {
 398  20 activators[i] = new CacheActivator(semaphore, names[i], sync);
 399  20 caches[i] = activators[i].getCacheSPI();
 400  20 activators[i].start();
 401    }
 402   
 403    // Make sure everyone is in sync
 404  4 TestingUtil.blockUntilViewsReceived(caches, 60000);
 405   
 406    // Release the semaphore to allow the threads to start work
 407  4 semaphore.release(count);
 408   
 409    // Sleep to ensure the threads get all the semaphore tickets
 410  4 TestingUtil.sleepThread((long) 1000);
 411   
 412    // Reacquire the semaphore tickets; when we have them all
 413    // we know the threads are done
 414  4 for (int i = 0; i < count; i++)
 415    {
 416  20 boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
 417  20 if (!acquired)
 418    {
 419  0 fail("failed to acquire semaphore " + i);
 420    }
 421    }
 422   
 423    // Sleep to allow any async calls to clear
 424  4 if (!sync)
 425    {
 426  2 TestingUtil.sleepThread(1000);
 427    }
 428   
 429    // Ensure the caches held by the activators see all the values
 430  4 for (int i = 0; i < count; i++)
 431    {
 432  20 Exception aException = activators[i].getException();
 433  20 boolean gotUnexpectedException = aException != null
 434    && !(aException instanceof InactiveRegionException ||
 435    aException.getCause() instanceof InactiveRegionException);
 436  20 if (gotUnexpectedException)
 437    {
 438  0 fail("Activator " + names[i] + " caught an exception " + aException);
 439    }
 440   
 441  20 for (int j = 0; j < count; j++)
 442    {
 443  100 Fqn fqn = new Fqn(A_B, names[j]);
 444  100 assertEquals("Incorrect value for " + fqn + " on activator " + names[i],
 445    "VALUE", activators[i].getCacheValue(fqn));
 446    // System.out.println(names[i] + ":" + fqn + " = " + activators[i].getCacheValue(fqn));
 447    }
 448   
 449    }
 450    }
 451    catch (Exception ex)
 452    {
 453  0 fail(ex.getLocalizedMessage());
 454    }
 455    finally
 456    {
 457  4 for (int i = 0; i < count; i++)
 458    {
 459  20 activators[i].cleanup();
 460    }
 461    }
 462   
 463    }
 464   
 465    /**
 466    * Starts two caches where each cache has N regions. We put some data in each of the regions.
 467    * We run two threads where each thread creates a cache then goes into a loop where it
 468    * activates the N regions, with a 1 sec pause between activations.
 469    * <p/>
 470    * Threads are started with 10 sec difference.
 471    * <p/>
 472    * This test simulates a 10 sec staggered start of 2 servers in a cluster, with each server
 473    * then deploying webapps.
 474    * <p/>
 475    * <p/>
 476    * <p/>
 477    * Failure condition is if any node sees an exception or if the final state
 478    * of all caches is not consistent.
 479    *
 480    * @param sync use REPL_SYNC or REPL_ASYNC
 481    * @throws Exception
 482    */
 483  4 private void concurrentActivationTest2(boolean sync) throws Exception
 484    {
 485  4 String[] names = {"A", "B"};
 486  4 int count = names.length;
 487  4 int regionsToActivate = 15;
 488  4 int sleepTimeBetweenNodeStarts = 10000;
 489  4 StaggeredWebDeployerActivator[] activators = new StaggeredWebDeployerActivator[count];
 490  4 try
 491    {
 492    // Create a semaphore and take all its tickets
 493  4 Semaphore semaphore = new Semaphore(count);
 494  4 for (int i = 0; i < count; i++)
 495    {
 496  8 semaphore.acquire();
 497    }
 498   
 499    // Create activation threads that will block on the semaphore
 500  4 CacheSPI[] caches = new CacheSPI[count];
 501  4 for (int i = 0; i < count; i++)
 502    {
 503  8 activators[i] = new StaggeredWebDeployerActivator(semaphore, names[i], sync, regionsToActivate);
 504  8 caches[i] = activators[i].getCacheSPI();
 505   
 506    // Release the semaphore to allow the thread to start working
 507  8 semaphore.release(1);
 508   
 509  8 activators[i].start();
 510  8 TestingUtil.sleepThread(sleepTimeBetweenNodeStarts);
 511    }
 512   
 513    // Make sure everyone is in sync
 514  4 TestingUtil.blockUntilViewsReceived(caches, 60000);
 515   
 516    // Sleep to ensure the threads get all the semaphore tickets
 517  4 TestingUtil.sleepThread(1000);
 518   
 519    // Reacquire the semaphore tickets; when we have them all
 520    // we know the threads are done
 521  4 for (int i = 0; i < count; i++)
 522    {
 523  8 boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
 524  8 if (!acquired)
 525    {
 526  0 fail("failed to acquire semaphore " + i);
 527    }
 528    }
 529   
 530    // Sleep to allow any async calls to clear
 531  4 if (!sync)
 532    {
 533  2 TestingUtil.sleepThread(1000);
 534    }
 535   
 536    // Ensure the caches held by the activators see all the values
 537  4 for (int i = 0; i < count; i++)
 538    {
 539  8 Exception aException = activators[i].getException();
 540  8 boolean gotUnexpectedException = aException != null
 541    && !(aException instanceof InactiveRegionException ||
 542    aException.getCause() instanceof InactiveRegionException);
 543  8 if (gotUnexpectedException)
 544    {
 545  0 fail("Activator " + names[i] + " caught an exception " + aException);
 546    }
 547   
 548  8 for (int j = 0; j < regionsToActivate; j++)
 549    {
 550  120 Fqn fqn = Fqn.fromString("/a/" + i + "/" + names[i]);
 551  120 assertEquals("Incorrect value for " + fqn + " on activator " + names[i],
 552    "VALUE", activators[i].getCacheValue(fqn));
 553    }
 554    }
 555    }
 556    catch (Exception ex)
 557    {
 558  0 fail(ex.getLocalizedMessage());
 559    }
 560    finally
 561    {
 562  4 for (int i = 0; i < count; i++)
 563    {
 564  8 activators[i].cleanup();
 565    }
 566    }
 567   
 568    }
 569   
 570    /**
 571    * Starts two caches where each cache has N regions. We put some data in each of the regions.
 572    * We run two threads where each thread creates a cache then goes into a loop where it
 573    * activates the N regions, with a 1 sec pause between activations.
 574    * <p/>
 575    * Threads are started with 10 sec difference.
 576    * <p/>
 577    * This test simulates a 10 sec staggered start of 2 servers in a cluster, with each server
 578    * then deploying webapps.
 579    * <p/>
 580    * <p/>
 581    * <p/>
 582    * Failure condition is if any node sees an exception or if the final state
 583    * of all caches is not consistent.
 584    */
 585  2 public void testConcurrentStartupActivationAsync() throws Exception
 586    {
 587  2 concurrentActivationTest2(false);
 588    }
 589   
 590    /**
 591    * Starts two caches where each cache has N regions. We put some data in each of the regions.
 592    * We run two threads where each thread creates a cache then goes into a loop where it
 593    * activates the N regions, with a 1 sec pause between activations.
 594    * <p/>
 595    * Threads are started with 10 sec difference.
 596    * <p/>
 597    * This test simulates a 10 sec staggered start of 2 servers in a cluster, with each server
 598    * then deploying webapps.
 599    * <p/>
 600    * <p/>
 601    * <p/>
 602    * Failure condition is if any node sees an exception or if the final state
 603    * of all caches is not consistent.
 604    */
 605  2 public void testConcurrentStartupActivationSync() throws Exception
 606    {
 607  2 concurrentActivationTest2(true);
 608    }
 609   
 610    /**
 611    * Tests partial state transfer under heavy concurrent load and REPL_SYNC.
 612    * See <code>concurrentUseTest</code> for details.
 613    *
 614    * @throws Exception
 615    */
 616  0 public void testConcurrentUseSync() throws Exception
 617    {
 618  0 concurrentUseTest(true);
 619    }
 620   
 621    /**
 622    * Tests partial state transfer under heavy concurrent load and REPL_ASYNC.
 623    * See <code>concurrentUseTest</code> for details.
 624    *
 625    * @throws Exception
 626    */
 627  0 public void testConcurrentUseAsync() throws Exception
 628    {
 629  0 concurrentUseTest(false);
 630    }
 631   
 632    /**
 633    * Initiates 5 caches, 4 with active trees and one with an inactive tree.
 634    * Each of the active caches begins rapidly generating puts against nodes
 635    * in a subtree for which it is responsible. The 5th cache activates
 636    * each subtree, and at the end confirms no node saw any exceptions and
 637    * that each node has consistent state.
 638    *
 639    * @param sync whether to use REPL_SYNC or REPL_ASYNCE
 640    * @throws Exception
 641    */
 642  0 private void concurrentUseTest(boolean sync) throws Exception
 643    {
 644  0 String[] names = {"B", "C", "D", "E"};
 645  0 int count = names.length;
 646  0 CacheStressor[] stressors = new CacheStressor[count];
 647   
 648  0 try
 649    {
 650   
 651    // The first cache we create is inactivated.
 652  0 CacheSPI cacheA = createCache("cacheA", sync, true, false);
 653   
 654  0 CacheSPI[] caches = new CacheSPI[count + 1];
 655  0 caches[0] = cacheA;
 656   
 657    // Create a semaphore and take all its tickets
 658  0 Semaphore semaphore = new Semaphore(count);
 659  0 for (int i = 0; i < count; i++)
 660    {
 661  0 semaphore.acquire();
 662    }
 663   
 664    // Create stressor threads that will block on the semaphore
 665   
 666  0 for (int i = 0; i < count; i++)
 667    {
 668  0 stressors[i] = new CacheStressor(semaphore, names[i], sync);
 669  0 caches[i + 1] = stressors[i].getCacheSPI();
 670  0 stressors[i].start();
 671    }
 672   
 673    // Make sure everyone's views are in sync
 674  0 TestingUtil.blockUntilViewsReceived(caches, 60000);
 675   
 676    // Repeat the basic test four times
 677    //for (int x = 0; x < 4; x++)
 678  0 for (int x = 0; x < 1; x++)
 679    {
 680  0 if (x > 0)
 681    {
 682    // Reset things by inactivating the region
 683    // and enabling the stressors
 684  0 for (int i = 0; i < count; i++)
 685    {
 686  0 cacheA.getRegion(Fqn.fromString("/" + names[i]), true).deactivate();
 687  0 System.out.println("Run " + x + "-- /" + names[i] + " deactivated on A");
 688  0 stressors[i].startPuts();
 689    }
 690    }
 691   
 692    // Release the semaphore to allow the threads to start work
 693  0 semaphore.release(count);
 694   
 695    // Sleep to ensure the threads get all the semaphore tickets
 696    // and to ensure puts are actively in progress
 697  0 TestingUtil.sleepThread((long) 300);
 698   
 699    // Activate cacheA
 700  0 for (int i = 0; i < count; i++)
 701    {
 702    // System.out.println("Activating /" + names[i] + " on A");
 703  0 cacheA.getRegion(Fqn.fromString("/" + names[i]), true).activate();
 704    // Stop the stressor so we don't pollute cacheA's state
 705    // with too many messages sent after activation -- we want
 706    // to compare transferred state with the sender
 707  0 stressors[i].stopPuts();
 708  0 System.out.println("Run " + x + "-- /" + names[i] + " activated on A");
 709    // Reacquire one semaphore ticket
 710  0 boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
 711  0 if (!acquired)
 712    {
 713  0 fail("failed to acquire semaphore " + i);
 714    }
 715   
 716    // Pause to allow other work to proceed
 717  0 TestingUtil.sleepThread(100);
 718    }
 719   
 720    // Sleep to allow any in transit msgs to clear
 721    // if (!sync)
 722  0 TestingUtil.sleepThread(1000);
 723   
 724    // Ensure the stressors saw no exceptions
 725  0 for (int i = 0; i < count; i++)
 726    {
 727  0 if (stressors[i].getException() != null && !(stressors[i].getException() instanceof InactiveRegionException))
 728    {
 729  0 fail("Stressor " + names[i] + " caught an exception " + stressors[i].getException());
 730    }
 731   
 732    }
 733   
 734  0 TestingUtil.sleepThread(1000);
 735   
 736    // Compare cache contents
 737  0 for (int i = 0; i < count; i++)
 738    {
 739  0 for (int j = 0; j < SUBTREE_SIZE; j++)
 740    {
 741  0 Fqn fqn = Fqn.fromString("/" + names[i] + "/" + j);
 742  0 assertEquals("/A/" + j + " matches " + fqn,
 743    cacheA.get(fqn, "KEY"),
 744    stressors[i].getCacheSPI().get(fqn, "KEY"));
 745    }
 746    }
 747    }
 748   
 749  0 for (int i = 0; i < count; i++)
 750    {
 751  0 stressors[i].stopThread();
 752    }
 753   
 754    }
 755    finally
 756    {
 757  0 for (int i = 0; i < count; i++)
 758    {
 759  0 if (stressors[i] != null)
 760    {
 761  0 stressors[i].cleanup();
 762    }
 763    }
 764    }
 765   
 766    }
 767   
 768    /**
 769    * Test for JBCACHE-913
 770    *
 771    * @throws Exception
 772    */
 773  2 public void testEvictionSeesStateTransfer() throws Exception
 774    {
 775   
 776  2 Configuration c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
 777  2 Cache cache1 = DefaultCacheFactory.getInstance().createCache(c, false);
 778  2 cache1.start();
 779  2 caches.put("evict1", cache1);
 780   
 781  2 cache1.put(Fqn.fromString("/a/b/c"), "key", "value");
 782   
 783  2 c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
 784  2 Cache cache2 = DefaultCacheFactory.getInstance().createCache(c, false);
 785  2 cache2.start();
 786  2 caches.put("evict2", cache2);
 787   
 788  2 Region region = cache2.getRegion(Fqn.ROOT, false);
 789    // We expect events for /a, /a/b and /a/b/c
 790  2 assertEquals("Saw the expected number of node events", 3, region.nodeEventQueueSize());
 791    }
 792   
 793    /**
 794    * Further test for JBCACHE-913
 795    *
 796    * @throws Exception
 797    */
 798  2 public void testEvictionAfterStateTransfer() throws Exception
 799    {
 800  2 Configuration c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
 801  2 Cache cache1 = DefaultCacheFactory.getInstance().createCache(c, false);
 802  2 cache1.start();
 803  2 caches.put("evict1", cache1);
 804   
 805  2 for (int i = 0; i < 25000; i++)
 806    {
 807  50000 cache1.put(Fqn.fromString("/base/" + i), "key", "base" + i);
 808  50000 if (i < 5)
 809    {
 810  10 cache1.put(Fqn.fromString("/org/jboss/test/data/" + i), "key", "data" + i);
 811    }
 812    }
 813   
 814  2 c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
 815  2 final Cache cache2 = DefaultCacheFactory.getInstance().createCache(c, false);
 816  2 cache2.start();
 817  2 caches.put("evict2", cache2);
 818   
 819  2 Node parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
 820  2 Set children = parent.getChildren();
 821  2 assertEquals("All data children transferred", 5, children.size());
 822  1 parent = cache2.getRoot().getChild(Fqn.fromString("/base"));
 823  1 children = parent.getChildren();
 824  1 assertTrue("Minimum number of base children transferred", children.size() >= 5000);
 825   
 826    // Sleep 2.5 secs so the nodes we are about to create in data won't
 827    // exceed the 4 sec TTL when eviction thread runs
 828  1 TestingUtil.sleepThread(2500);
 829   
 830    class Putter extends Thread
 831    {
 832    Cache cache = null;
 833    boolean stopped = false;
 834    Exception ex = null;
 835   
 836  2 public void run()
 837    {
 838  2 int i = 25000;
 839  2 while (!stopped)
 840    {
 841  331 try
 842    {
 843  331 cache.put(Fqn.fromString("/base/" + i), "key", "base" + i);
 844  331 cache.put(Fqn.fromString("/org/jboss/test/data/" + i), "key", "data" + i);
 845  331 i++;
 846    }
 847    catch (Exception e)
 848    {
 849  0 ex = e;
 850    }
 851    }
 852    }
 853    }
 854  1 Putter p1 = new Putter();
 855  1 p1.cache = cache1;
 856  1 p1.start();
 857  1 Putter p2 = new Putter();
 858  1 p2.cache = cache2;
 859  1 p2.start();
 860   
 861  1 Random rnd = new Random();
 862  1 TestingUtil.sleepThread(rnd.nextInt(200));
 863   
 864  1 int maxCountBase = 0;
 865  1 int maxCountData = 0;
 866  1 boolean sawBaseDecrease = false;
 867  1 boolean sawDataDecrease = false;
 868  1 long start = System.currentTimeMillis();
 869  4 while ((System.currentTimeMillis() - start) < 10000)
 870    {
 871  4 parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
 872  4 children = parent.getChildren();
 873  4 if (children != null)
 874    {
 875  4 int dataCount = children.size();
 876  4 if (dataCount < maxCountData)
 877    {
 878  1 System.out.println("data " + dataCount + " < " + maxCountData + " elapsed = " + (System.currentTimeMillis() - start));
 879  1 sawDataDecrease = true;
 880    }
 881    else
 882    {
 883  3 maxCountData = dataCount;
 884    }
 885    }
 886   
 887  4 parent = cache2.getRoot().getChild(Fqn.fromString("/base"));
 888  4 children = parent.getChildren();
 889  4 if (children != null)
 890    {
 891  4 int baseCount = children.size();
 892  4 if (baseCount < maxCountBase)
 893    {
 894  1 System.out.println("base " + baseCount + " < " + maxCountBase + " elapsed = " + (System.currentTimeMillis() - start));
 895  1 sawBaseDecrease = true;
 896    }
 897    else
 898    {
 899  3 maxCountBase = baseCount;
 900    }
 901    }
 902   
 903  4 if (sawDataDecrease && sawBaseDecrease)
 904    {
 905  1 break;
 906    }
 907   
 908  3 TestingUtil.sleepThread(50);
 909    }
 910   
 911  1 p1.stopped = true;
 912  1 p2.stopped = true;
 913  1 p1.join(1000);
 914  1 p2.join(1000);
 915   
 916  1 assertTrue("Saw data decrease", sawDataDecrease);
 917  1 assertTrue("Saw base decrease", sawBaseDecrease);
 918  1 assertNull("No exceptions in p1", p1.ex);
 919  1 assertNull("No exceptions in p2", p2.ex);
 920   
 921    // Sleep 5.1 secs so we are sure the eviction thread ran
 922  1 TestingUtil.sleepThread(5100);
 923   
 924  1 parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
 925  1 children = parent.getChildren();
 926  1 if (children != null)
 927    {
 928  1 System.out.println(children.size());
 929  1 assertTrue("Excess children evicted", children.size() <= 5);
 930    }
 931  1 parent = cache2.getRoot().getChild(Fqn.fromString("/base"));
 932  1 children = parent.getChildren();
 933  1 if (children != null)
 934    {
 935  1 System.out.println(children.size());
 936  1 assertTrue("Excess children evicted", children.size() <= 25000);
 937    }
 938   
 939    // Sleep more to let the eviction thread run again,
 940    // which will evict all data nodes due to their ttl of 4 secs
 941  1 TestingUtil.sleepThread(8100);
 942   
 943  1 parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
 944  1 children = parent.getChildren();
 945  1 if (children != null)
 946    {
 947  1 assertEquals("All data children evicted", 0, children.size());
 948    }
 949    }
 950   
 951  2 private Object createBen(ClassLoader loader) throws Exception
 952    {
 953  2 Class addrClazz = loader.loadClass(ADDRESS_CLASSNAME);
 954  2 Method setCity = addrClazz.getMethod("setCity", new Class[]{String.class});
 955  2 Method setStreet = addrClazz.getMethod("setStreet", new Class[]{String.class});
 956  2 Method setZip = addrClazz.getMethod("setZip", new Class[]{int.class});
 957  2 Object addr = addrClazz.newInstance();
 958  2 setCity.invoke(addr, new Object[]{"San Jose"});
 959  2 setStreet.invoke(addr, new Object[]{"1007 Home"});
 960  2 setZip.invoke(addr, new Object[]{90210});
 961   
 962  2 Class benClazz = loader.loadClass(PERSON_CLASSNAME);
 963  2 Method setName = benClazz.getMethod("setName", new Class[]{String.class});
 964  2 Method setAddress = benClazz.getMethod("setAddress", new Class[]{addrClazz});
 965  2 Object ben = benClazz.newInstance();
 966  2 setName.invoke(ben, new Object[]{"Ben"});
 967  2 setAddress.invoke(ben, new Object[]{addr});
 968   
 969  2 return ben;
 970    }
 971   
 972    private class CacheActivator extends CacheUser
 973    {
 974   
 975  20 CacheActivator(Semaphore semaphore,
 976    String name,
 977    boolean sync)
 978    throws Exception
 979    {
 980  20 super(semaphore, name, sync, false);
 981    }
 982   
 983  20 void useCache() throws Exception
 984    {
 985  20 TestingUtil.sleepRandom(5000);
 986  20 createAndActivateRegion(cache, A_B);
 987    // System.out.println(name + " activated region" + " " + System.currentTimeMillis());
 988  20 Fqn childFqn = Fqn.fromString("/a/b/" + name);
 989   
 990  20 cache.put(childFqn, "KEY", "VALUE");
 991    // System.out.println(name + " put fqn " + childFqn + " " + System.currentTimeMillis());
 992   
 993    }
 994   
 995  100 public Object getCacheValue(Fqn fqn) throws CacheException
 996    {
 997  100 return cache.get(fqn, "KEY");
 998    }
 999    }
 1000   
 1001    private class StaggeredWebDeployerActivator extends CacheUser
 1002    {
 1003   
 1004    int regionCount = 15;
 1005   
 1006  8 StaggeredWebDeployerActivator(Semaphore semaphore,
 1007    String name,
 1008    boolean sync,
 1009    int regionCount)
 1010    throws Exception
 1011    {
 1012  8 super(semaphore, name, sync, false);
 1013  8 this.regionCount = regionCount;
 1014    }
 1015   
 1016  8 void useCache() throws Exception
 1017    {
 1018  8 for (int i = 0; i < regionCount; i++)
 1019    {
 1020  120 createAndActivateRegion(cache, Fqn.fromString("/a/" + i));
 1021   
 1022  120 Fqn childFqn = Fqn.fromString("/a/" + i + "/" + name);
 1023  120 cache.put(childFqn, "KEY", "VALUE");
 1024   
 1025  120 TestingUtil.sleepThread(1000);
 1026    }
 1027    }
 1028   
 1029  120 public Object getCacheValue(Fqn fqn) throws CacheException
 1030    {
 1031  120 return cache.get(fqn, "KEY");
 1032    }
 1033    }
 1034   
 1035    private class CacheStressor extends CacheUser
 1036    {
 1037    private Random random = new Random(System.currentTimeMillis());
 1038    private boolean putsStopped = false;
 1039    private boolean stopped = false;
 1040   
 1041  0 CacheStressor(Semaphore semaphore,
 1042    String name,
 1043    boolean sync)
 1044    throws Exception
 1045    {
 1046  0 super(semaphore, name, sync, true);
 1047    }
 1048   
 1049  0 void useCache() throws Exception
 1050    {
 1051    // Do continuous puts into the cache. Use our own nodes,
 1052    // as we're not testing conflicts between writer nodes,
 1053    // just whether activation causes problems
 1054  0 int factor = 0;
 1055  0 int i = 0;
 1056  0 Fqn fqn = null;
 1057   
 1058  0 boolean acquired = false;
 1059  0 while (!stopped)
 1060    {
 1061  0 if (i > 0)
 1062    {
 1063  0 acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
 1064  0 if (!acquired)
 1065    {
 1066  0 throw new Exception(name + " cannot acquire semaphore");
 1067    }
 1068    }
 1069   
 1070  0 while (!putsStopped)
 1071    {
 1072  0 factor = random.nextInt(50);
 1073   
 1074  0 fqn = Fqn.fromString("/" + name + "/" + String.valueOf(factor % SUBTREE_SIZE));
 1075  0 Integer value = factor / SUBTREE_SIZE;
 1076  0 cache.put(fqn, "KEY", value);
 1077   
 1078  0 TestingUtil.sleepThread((long) factor);
 1079   
 1080  0 i++;
 1081    }
 1082   
 1083  0 System.out.println(name + ": last put [#" + i + "] -- " + fqn + " = " + (factor / SUBTREE_SIZE));
 1084   
 1085  0 semaphore.release();
 1086   
 1087    // Go to sleep until directed otherwise
 1088  0 while (!stopped && putsStopped)
 1089    {
 1090  0 TestingUtil.sleepThread((long) 100);
 1091    }
 1092    }
 1093    }
 1094   
 1095  0 public void stopPuts()
 1096    {
 1097  0 putsStopped = true;
 1098    }
 1099   
 1100  0 public void startPuts()
 1101    {
 1102  0 putsStopped = false;
 1103    }
 1104   
 1105  0 public void stopThread()
 1106    {
 1107  0 stopped = true;
 1108  0 if (thread.isAlive())
 1109    {
 1110  0 thread.interrupt();
 1111    }
 1112    }
 1113   
 1114   
 1115    }
 1116    }