Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 1,083   Methods: 35
NCLOC: 760   Classes: 4
 
 Source file Conditionals Statements Methods TOTAL
StateTransferAopTestBase.java 32.7% 66.3% 71.4% 60.3%
coverage coverage
 1    /*
 2    * JBoss, the OpenSource J2EE webOS
 3    *
 4    * Distributable under LGPL license.
 5    * See terms of license at gnu.org.
 6    */
 7   
 8    package org.jboss.cache.pojo.statetransfer;
 9   
 10    import junit.framework.TestCase;
 11    import org.apache.commons.logging.Log;
 12    import org.apache.commons.logging.LogFactory;
 13    import org.jboss.cache.Cache;
 14    import org.jboss.cache.CacheException;
 15    import org.jboss.cache.CacheSPI;
 16    import org.jboss.cache.Fqn;
 17    import org.jboss.cache.Region;
 18    import org.jboss.cache.config.CacheLoaderConfig;
 19    import org.jboss.cache.config.Configuration;
 20    import org.jboss.cache.config.Configuration.CacheMode;
 21    import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
 22    import org.jboss.cache.factories.XmlConfigurationParser;
 23    import org.jboss.cache.loader.CacheLoader;
 24    import org.jboss.cache.misc.TestingUtil;
 25    import org.jboss.cache.pojo.PojoCache;
 26    import org.jboss.cache.pojo.PojoCacheFactory;
 27    import org.jboss.cache.pojo.test.Address;
 28    import org.jboss.cache.pojo.test.Person;
 29    import org.jboss.cache.xml.XmlHelper;
 30    import org.w3c.dom.Element;
 31   
 32    import javax.transaction.TransactionManager;
 33    import java.io.File;
 34    import java.util.HashMap;
 35    import java.util.HashSet;
 36    import java.util.Map;
 37    import java.util.Random;
 38    import java.util.Set;
 39    import java.util.concurrent.Semaphore;
 40    import java.util.concurrent.TimeUnit;
 41   
 42    /**
 43    * Tests state transfer in PojoCache.
 44    *
 45    * @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
 46    * @version $Revision: 1.6 $
 47    */
 48    public abstract class StateTransferAopTestBase extends TestCase
 49    {
 50    private Map caches;
 51   
 52    public static final String A_B_1 = "/a/b/1";
 53    public static final String A_B_2 = "/a/b/2";
 54    public static final String A_C_1 = "/a/c/1";
 55    public static final String A_C_2 = "/a/c/2";
 56   
 57    public static final Fqn A_B_1_f = Fqn.fromString("/a/b/1");
 58    public static final Fqn A_B_2_f = Fqn.fromString("/a/b/2");
 59    public static final Fqn A_C_1_f = Fqn.fromString("/a/c/1");
 60    public static final Fqn A_C_2_f = Fqn.fromString("/a/c/2");
 61   
 62    private static final int SUBTREE_SIZE = 10;
 63   
 64    private Person joe;
 65    private Person bob;
 66    private Person jane;
 67    private Person jill;
 68    private Address addr1;
 69    private Address addr2;
 70   
 71    public static final Integer TWENTY = 20;
 72    public static final Integer TWENTYFIVE = 25;
 73    public static final Integer FORTY = 40;
 74   
 75    private Log log = LogFactory.getLog(StateTransferAopTestBase.class);
 76   
 77  1 public void testInitialStateTransfer() throws Exception
 78    {
 79  1 log.info("Enter testInitialStateTransfer");
 80   
 81  1 PojoCache cache1 = createCache("cache1", true, false, false);
 82   
 83  1 cache1.attach(A_B_1, joe);
 84  1 cache1.attach(A_B_2, jane);
 85  1 cache1.attach(A_C_1, bob);
 86  1 cache1.attach(A_C_2, jill);
 87   
 88  1 PojoCache cache2 = createCache("cache2", true, false, false);
 89   
 90    // Pause to give caches time to see each other
 91    // TestingUtil.blockUntilViewsReceived(new Cache[]
 92    // {cache1.getCache(), cache2.getCache()}, 60000);
 93   
 94  1 Person ab1 = (Person) cache2.find(A_B_1);
 95  1 Person ab2 = (Person) cache2.find(A_B_2);
 96  1 Person ac1 = (Person) cache2.find(A_C_1);
 97  1 Person ac2 = (Person) cache2.find(A_C_2);
 98  1 assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
 99  1 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
 100  1 assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName());
 101  1 assertEquals("City for /a/b/2 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
 102  1 assertTrue("Joe and Jane have same Address", ab1.getAddress() == ab2.getAddress());
 103  1 assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
 104  1 assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
 105  1 assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
 106  1 assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
 107  1 assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress());
 108    }
 109   
 110  1 public void testInitialStateTferWithLoader() throws Exception
 111    {
 112  1 log.info("Enter testInitialStateTferWithLoader");
 113   
 114  1 PojoCache cache1 = createCache("cache1", false, false, true);
 115   
 116  1 cache1.attach(A_B_1, joe);
 117  1 cache1.attach(A_B_2, jane);
 118  1 cache1.attach(A_C_1, bob);
 119  1 cache1.attach(A_C_2, jill);
 120   
 121  1 PojoCache cache2 = createCache("cache2", false, false, true);
 122   
 123    // Pause to give caches time to see each other
 124  1 TestingUtil.blockUntilViewsReceived(new Cache[]
 125    {cache1.getCache(), cache2.getCache()}, 60000);
 126   
 127  1 Person ab1 = (Person) cache2.find(A_B_1);
 128  1 Person ab2 = (Person) cache2.find(A_B_2);
 129  1 Person ac1 = (Person) cache2.find(A_C_1);
 130  1 Person ac2 = (Person) cache2.find(A_C_2);
 131  1 assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
 132  1 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
 133  1 assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName());
 134  1 assertEquals("City for /a/b/2 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
 135  1 assertTrue("Joe and Jane have same Address", ab1.getAddress() == ab2.getAddress());
 136  1 assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
 137  1 assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
 138  1 assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
 139  1 assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
 140  1 assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress());
 141    }
 142   
 143  8 private void createAndActivateRegion(Cache c, Fqn f)
 144    {
 145  8 Region r = c.getRegion(f, true);
 146  8 r.registerContextClassLoader(getClass().getClassLoader());
 147  8 r.activate();
 148    }
 149   
 150  1 public void testPartialStateTransfer() throws Exception
 151    {
 152  1 log.info("Enter testPartialStateTransfer");
 153   
 154  1 PojoCache cache1 = createCache("cache1", false, true, false);
 155   
 156  1 createAndActivateRegion(cache1.getCache(), Fqn.fromString("/a"));
 157  1 createAndActivateRegion(cache1.getCache(), Fqn.fromString("/__JBossInternal__"));
 158   
 159  1 cache1.attach(A_B_1, joe);
 160  1 cache1.attach(A_B_2, jane);
 161   
 162  1 PojoCache cache2 = createCache("cache2", false, true, false);
 163   
 164    // Pause to give caches time to see each other
 165  1 TestingUtil.blockUntilViewsReceived(new Cache[]
 166    {cache1.getCache(), cache2.getCache()}, 60000);
 167   
 168  1 assertNull("/a/b/1 not transferred per policy", cache2.find(A_B_1));
 169  1 assertNull("/a/b/2 not transferred per policy", cache2.find(A_B_2));
 170   
 171  1 createAndActivateRegion(cache2.getCache(), Fqn.fromString("/a"));
 172  1 createAndActivateRegion(cache2.getCache(), Fqn.fromString("/__JBossInternal__"));
 173   
 174  1 Person ab1 = (Person) cache2.find(A_B_1);
 175  1 Person ab2 = (Person) cache2.find(A_B_2);
 176  1 assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
 177  1 assertEquals("City for /a/b/1 is Anytown", joe.getAddress().getCity(), ab1.getAddress().getCity());
 178  1 assertEquals("Name for /a/b/2 is Jane", jane.getName(), ab2.getName());
 179  1 assertEquals("City for /a/b/2 is Anytown", jane.getAddress().getCity(), ab2.getAddress().getCity());
 180  1 assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
 181   
 182  1 cache1.attach(A_C_1, bob);
 183  1 cache1.attach(A_C_2, jill);
 184   
 185  1 assertNotNull("/a/c/1 should be transferred per policy", cache2.find(A_C_1));
 186   
 187  1 cache1.getCache().getRegion(Fqn.fromString("/a"), true).deactivate();
 188   
 189  1 cache1.getCache().getRegion(Fqn.fromString("/a"), true).activate();
 190   
 191  1 ab1 = (Person) cache1.find(A_B_1);
 192  1 assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
 193  1 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
 194  1 ab2 = (Person) cache1.find(A_B_2);
 195  1 assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName());
 196  1 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
 197  1 assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
 198    }
 199   
 200  1 public void testPartialStateTransferWithLoader() throws Exception
 201    {
 202  1 log.info("Enter testPartialStateTransferWithLoader");
 203   
 204  1 PojoCache cache1 = createCache("cache1", false, true, true);
 205  1 createAndActivateRegion(cache1.getCache(), Fqn.fromString("/a"));
 206  1 createAndActivateRegion(cache1.getCache(), Fqn.fromString("/__JBossInternal__"));
 207   
 208  1 cache1.attach(A_B_1, joe);
 209  1 cache1.attach(A_B_2, jane);
 210   
 211  1 PojoCache cache2 = createCache("cache2", false, true, true);
 212   
 213    // Pause to give caches time to see each other
 214  1 TestingUtil.blockUntilViewsReceived(new Cache[]
 215    {cache1.getCache(), cache2.getCache()}, 60000);
 216   
 217  1 CacheLoader loader = ((CacheSPI) cache2.getCache()).getCacheLoaderManager().getCacheLoader();
 218   
 219  1 Map map = loader.get(A_B_1_f);
 220  1 if (map != null)
 221    {
 222  0 assertNull("/a/b/1 name not transferred per policy", map.get("name"));
 223  0 assertNull("/a/b/1 age not transferred per policy", map.get("age"));
 224    }
 225  1 map = loader.get(A_B_2_f);
 226  1 if (map != null)
 227    {
 228  0 assertNull("/a/b/1 name not transferred per policy", map.get("name"));
 229  0 assertNull("/a/b/1 age not transferred per policy", map.get("age"));
 230    }
 231  1 assertNull("/a/b/1 not transferred per policy", cache2.find(A_B_1));
 232  1 assertNull("/a/b/2 not transferred per policy", cache2.find(A_B_2));
 233   
 234  1 createAndActivateRegion(cache2.getCache(), Fqn.fromString("/a"));
 235  1 createAndActivateRegion(cache2.getCache(), Fqn.fromString("/__JBossInternal__"));
 236   
 237    // assertEquals("Correct name from loader for /a/b/1", joe.getName(), loader.get(A_B_1_f).get("name"));
 238    // assertEquals("Correct age from loader for /a/b/1", TWENTY, loader.get(A_B_1_f).get("age"));
 239    // assertEquals("Correct name from loader for /a/b/2", jane.getName(), loader.get(A_B_2_f).get("name"));
 240    // assertEquals("Correct age from loader for /a/b/2", TWENTYFIVE, loader.get(A_B_2_f).get("age"));
 241   
 242   
 243  1 Person ab1 = (Person) cache2.find(A_B_1);
 244  1 Person ab2 = (Person) cache2.find(A_B_2);
 245  1 assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
 246  1 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
 247  1 assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName());
 248  1 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
 249  1 assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
 250   
 251  1 cache1.attach(A_C_1, bob);
 252  1 cache1.attach(A_C_2, jill);
 253  1 Thread.sleep(200);
 254   
 255  1 assertNotNull("/a/c/1 transferred per policy", cache2.find(A_C_1));
 256  1 assertNotNull("/a/c/1 transferred per policy", cache2.find(A_C_2));
 257   
 258  1 Person ac1 = (Person) cache2.find(A_C_1);
 259  1 Person ac2 = (Person) cache2.find(A_C_2);
 260  1 assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
 261  1 assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
 262  1 assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
 263  1 assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
 264  1 assertTrue("Bob and Jill have same Address", ac1.getAddress() == ac2.getAddress());
 265   
 266  1 cache1.getCache().getRegion(Fqn.fromString("/a"), true).deactivate();
 267   
 268  1 ab1 = (Person) cache1.find(A_B_1);
 269  1 assertEquals("Name for /a/b/1 is Joe", joe.getName(), ab1.getName());
 270  1 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab1.getAddress().getCity());
 271  1 ab2 = (Person) cache1.find(A_B_2);
 272  1 assertEquals("Name for /a/b/1 is Jane", jane.getName(), ab2.getName());
 273  1 assertEquals("City for /a/b/1 is Anytown", addr1.getCity(), ab2.getAddress().getCity());
 274  1 assertTrue("Address for Joe and Jane is the same object", ab1.getAddress() == ab2.getAddress());
 275  1 ac1 = (Person) cache1.find(A_C_1);
 276  1 assertEquals("Name for /a/c/1 is Bob", bob.getName(), ac1.getName());
 277  1 assertEquals("City for /a/c/1 is Fremont", addr2.getCity(), ac1.getAddress().getCity());
 278  1 ac2 = (Person) cache1.find(A_C_2);
 279  1 assertEquals("Name for /a/c/2 is Jill", jill.getName(), ac2.getName());
 280  1 assertEquals("City for /a/c/2 is Fremont", addr2.getCity(), ac2.getAddress().getCity());
 281  1 assertTrue("Address for Bob and Jill is the same object", ac1.getAddress() == ac2.getAddress());
 282    }
 283   
 284   
 285    /**
 286    * Tests concurrent activation of the same subtree by multiple nodes in a
 287    * REPL_SYNC environment. The idea is to see what would happen with a
 288    * farmed deployment. See <code>concurrentActivationTest</code> for details.
 289    *
 290    * @throws Exception
 291    */
 292  1 public void testConcurrentActivationSync() throws Exception
 293    {
 294  1 log.info("Enter testConcurrentActivationSync");
 295   
 296  1 concurrentActivationTest(true);
 297    }
 298   
 299    /**
 300    * Tests concurrent activation of the same subtree by multiple nodes in a
 301    * REPL_ASYNC environment. The idea is to see what would happen with a
 302    * farmed deployment. See <code>concurrentActivationTest</code> for details.
 303    *
 304    * @throws Exception
 305    */
 306  1 public void testConcurrentActivationAsync() throws Exception
 307    {
 308  1 log.info("Enter testConcurrentActivationAsync");
 309   
 310  1 concurrentActivationTest(false);
 311    }
 312   
 313    /**
 314    * Starts 5 caches and then concurrently activates the same region under
 315    * all 5, causing each to attempt a partial state transfer from the others.
 316    * As soon as each cache has activated its region, it does a put to a node
 317    * in the region, thus complicating the lives of the other caches trying
 318    * to get partial state.
 319    * <p/>
 320    * Failure condition is if any node sees an exception or if the final state
 321    * of all caches is not consistent.
 322    *
 323    * @param sync use REPL_SYNC or REPL_ASYNC
 324    * @throws Exception
 325    */
 326  2 private void concurrentActivationTest(boolean sync) throws Exception
 327    {
 328  2 String[] names = {"A", "B", "C", "D", "E"};
 329  2 int count = names.length;
 330  2 CacheActivator[] activators = new CacheActivator[count];
 331   
 332   
 333  2 try
 334    {
 335    // Create a semaphore and take all its tickets
 336  2 Semaphore semaphore = new Semaphore(count);
 337  2 for (int i = 0; i < count; i++)
 338    {
 339  10 semaphore.acquire();
 340    }
 341   
 342    // Create activation threads that will block on the semaphore
 343  2 Cache[] caches = new Cache[count];
 344  2 for (int i = 0; i < count; i++)
 345    {
 346  10 activators[i] = new CacheActivator(semaphore, names[i], sync);
 347  10 caches[i] = activators[i].getCache();
 348  10 activators[i].start();
 349    }
 350   
 351    // Make sure everyone is in sync
 352  2 TestingUtil.blockUntilViewsReceived(caches, 60000);
 353   
 354    // Release the semaphore to allow the threads to start work
 355  1 semaphore.release(count);
 356   
 357    // Sleep to ensure the threads get all the semaphore tickets
 358  1 TestingUtil.sleepThread(1000);
 359   
 360    // Reacquire the semaphore tickets; when we have them all
 361    // we know the threads are done
 362  1 for (int i = 0; i < count; i++)
 363    {
 364  1 boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
 365  1 if (!acquired)
 366  1 fail("failed to acquire semaphore " + i);
 367    }
 368   
 369    // Sleep to allow any async calls to clear
 370  0 if (!sync)
 371  0 TestingUtil.sleepThread(500);
 372   
 373    // Ensure the caches held by the activators see all the values
 374  0 for (int i = 0; i < count; i++)
 375    {
 376  0 assertNull("Activator " + names[i] + " caught an exception",
 377    activators[i].getException());
 378   
 379  0 for (int j = 0; j < count; j++)
 380    {
 381  0 String fqn = "/a/b/" + names[j];
 382  0 Person p = (Person) activators[i].getCacheValue(fqn);
 383  0 assertNotNull(names[i] + ":" + fqn + " is not null", p);
 384  0 assertEquals("Correct name for " + names[i] + ":" + fqn,
 385    "Person " + names[j], p.getName());
 386  0 assertEquals("Correct street for " + names[i] + ":" + fqn,
 387    names[j] + " Test Street", p.getAddress().getStreet());
 388    // System.out.println(names[i] + ":" + fqn + " = " + activators[i].getCacheValue(fqn));
 389    }
 390   
 391    }
 392    }
 393    catch (Exception ex)
 394    {
 395  1 fail(ex.getLocalizedMessage());
 396    }
 397    finally
 398    {
 399  2 for (int i = 0; i < count; i++)
 400  10 activators[i].cleanup();
 401    }
 402   
 403    }
 404   
 405    /**
 406    * Tests partial state transfer under heavy concurrent load and REPL_SYNC.
 407    * See <code>concurrentUseTest</code> for details.
 408    *
 409    * @throws Exception
 410    */
 411  0 public void testConcurrentUseSync() throws Exception
 412    {
 413  0 log.info("Enter testConcurrentUseSync");
 414   
 415    // concurrentUseTest(true);
 416    }
 417   
 418    /**
 419    * Tests partial state transfer under heavy concurrent load and REPL_ASYNC.
 420    * See <code>concurrentUseTest</code> for details.
 421    *
 422    * @throws Exception
 423    */
 424  0 public void testConcurrentUseAsync() throws Exception
 425    {
 426  0 log.info("Enter testConcurrentUseAsync");
 427   
 428    // concurrentUseTest(false);
 429    }
 430   
 431    /**
 432    * Initiates 5 caches, 4 with active trees and one with an inactive tree.
 433    * Each of the active caches begins rapidly generating puts against nodes
 434    * in a subtree for which it is responsible. The 5th cache activates
 435    * each subtree, and at the end confirms no node saw any exceptions and
 436    * that each node has consistent state.
 437    *
 438    * @param sync whether to use REPL_SYNC or REPL_ASYNCE
 439    * @throws Exception
 440    */
 441  0 private void XconcurrentUseTest(boolean sync) throws Exception
 442    {
 443  0 String[] names = {"B", "C", "D", "E"};
 444  0 int count = names.length;
 445  0 CacheStressor[] stressors = new CacheStressor[count];
 446   
 447  0 try
 448    {
 449   
 450  0 PojoCache cacheA = createCache("cacheA", sync, true, false, false);
 451   
 452  0 Cache[] caches = new Cache[count + 1];
 453  0 caches[0] = cacheA.getCache();
 454   
 455    // Create a semaphore and take all its tickets
 456  0 Semaphore semaphore = new Semaphore(count);
 457  0 for (int i = 0; i < count; i++)
 458    {
 459  0 semaphore.acquire();
 460    }
 461   
 462    // Create stressor threads that will block on the semaphore
 463   
 464  0 for (int i = 0; i < count; i++)
 465    {
 466  0 stressors[i] = new CacheStressor(semaphore, names[i], sync);
 467  0 caches[i + 1] = stressors[i].getCache();
 468  0 stressors[i].start();
 469    // Give each one a chance to stabilize
 470  0 TestingUtil.sleepThread(100);
 471    }
 472   
 473    // Make sure everyone's views are in sync
 474  0 TestingUtil.blockUntilViewsReceived(caches, 60000);
 475   
 476    // Repeat the basic test two times in order to involve inactivation
 477  0 for (int x = 0; x < 2; x++)
 478    {
 479    // if (x > 0)
 480    // {
 481    // Reset things by inactivating the region
 482    // and enabling the stressors
 483  0 for (int i = 0; i < count; i++)
 484    {
 485  0 cacheA.getCache().getRegion(Fqn.fromString("/" + names[i]), true).deactivate();
 486  0 log.info("TEST: Run " + x + "-- /" + names[i] + " inactivated on A");
 487  0 stressors[i].startPuts();
 488    }
 489    // }
 490   
 491    // Release the semaphore to allow the threads to start work
 492  0 semaphore.release(count);
 493   
 494    // Sleep to ensure the threads get all the semaphore tickets
 495    // and to ensure puts are actively in progress
 496  0 TestingUtil.sleepThread(300);
 497   
 498    // Activate cacheA
 499  0 for (int i = 0; i < count; i++)
 500    {
 501  0 log.info("TEST: Activating /" + names[i] + " on A");
 502  0 cacheA.getCache().getRegion(Fqn.fromString("/" + names[i]), true).activate();
 503    // Stop the stressor so we don't pollute cacheA's state
 504    // with too many messages sent after activation -- we want
 505    // to compare transferred state with the sender
 506  0 stressors[i].stopPuts();
 507  0 log.info("TEST: Run " + x + "-- /" + names[i] + " activated on A");
 508    // Reacquire one semaphore ticket
 509  0 boolean acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
 510  0 if (!acquired)
 511  0 fail("failed to acquire semaphore " + names[i]);
 512  0 log.info("TEST: Run " + x + "-- acquired semaphore from " + names[i]);
 513   
 514    // Pause to allow other work to proceed
 515  0 TestingUtil.sleepThread(100);
 516    }
 517   
 518    // Sleep to allow any in transit msgs to clear
 519  0 if (!sync)
 520  0 TestingUtil.sleepThread(2000);
 521   
 522    // Ensure the stressors saw no exceptions
 523  0 for (int i = 0; i < count; i++)
 524    {
 525  0 Exception e = stressors[i].getException();
 526  0 if (e != null)
 527    {
 528  0 log.error("Stressor " + names[i] + " caught an exception",
 529    e);
 530  0 throw e;
 531    }
 532    }
 533   
 534    // log.info("Cache A details:\n" + cacheA.printDetails());
 535   
 536    // Compare cache contents
 537  0 Person p1 = null;
 538  0 Person p2 = null;
 539  0 for (int i = 0; i < count; i++)
 540    {
 541    // log.info("Cache " + names[i] + " details:\n" +
 542    // stressors[i].getTreeCache().printDetails());
 543   
 544  0 for (int j = 0; j < SUBTREE_SIZE; j++)
 545    {
 546   
 547  0 String fqn = "/" + names[i] + "/" + j;
 548  0 log.info("TEST: Getting A:" + fqn);
 549  0 p1 = (Person) cacheA.find(fqn);
 550  0 boolean p1Null = p1 == null;
 551  0 log.info("TEST: Getting " + names[i] + ":" + fqn);
 552    // p2 = (Person) stressors[i].getCache().find(fqn);
 553  0 boolean p2Null = p2 == null;
 554  0 assertEquals("Run " + x + ": " + fqn +
 555    " null status matches", p1Null, p2Null);
 556  0 if (!p1Null)
 557    {
 558  0 assertEquals("Run " + x + ": A:" + fqn + " age matches " + names[i] + ":" + fqn,
 559    p1.getAge(), p2.getAge());
 560  0 assertEquals("Run " + x + ": A:" + fqn + " name matches " + names[i] + ":" + fqn,
 561    p1.getName(), p2.getName());
 562  0 assertEquals("Run " + x + ": A:" + fqn + " address matches " + names[i] + ":" + fqn,
 563    p1.getAddress().getStreet(),
 564    p2.getAddress().getStreet());
 565    }
 566    }
 567    }
 568    }
 569   
 570  0 for (int i = 0; i < count; i++)
 571  0 stressors[i].stopThread();
 572   
 573    }
 574    finally
 575    {
 576  0 for (int i = 0; i < count; i++)
 577    {
 578  0 if (stressors[i] != null)
 579  0 stressors[i].cleanup();
 580    }
 581    }
 582   
 583    }
 584   
 585  8 protected PojoCache createCache(String cacheID, boolean sync, boolean useMarshalling, boolean useCacheLoader)
 586    throws Exception
 587    {
 588  8 return createCache(cacheID, sync, useMarshalling, useCacheLoader, true);
 589    }
 590   
 591  18 protected PojoCache createCache(String cacheID, boolean sync,
 592    boolean useMarshalling,
 593    boolean useCacheLoader,
 594    boolean inactiveOnStartup)
 595    throws Exception
 596    {
 597  18 if (caches.get(cacheID) != null)
 598  0 throw new IllegalStateException(cacheID + " already created");
 599   
 600  18 CacheMode mode = sync ? CacheMode.REPL_SYNC : CacheMode.REPL_ASYNC;
 601  18 Configuration c = UnitTestCacheConfigurationFactory.createConfiguration(mode);
 602  18 c.setClusterName("StateTransferTestBase");
 603  18 c.setReplVersionString(getReplicationVersion());
 604    // Use a long timeout to facilitate setting debugger breakpoints
 605  18 c.setStateRetrievalTimeout(60000);
 606  18 c.setLockParentForChildInsertRemove(true);
 607  18 if (useMarshalling)
 608    {
 609  14 c.setUseRegionBasedMarshalling(true);
 610  14 c.setInactiveOnStartup(inactiveOnStartup);
 611    }
 612  18 if (useCacheLoader)
 613    {
 614  4 configureCacheLoader(c, cacheID);
 615    }
 616   
 617  18 PojoCache cache = PojoCacheFactory.createCache(c, true);
 618    // Put the cache in the map before starting, so if it fails in
 619    // start it can still be destroyed later
 620  18 caches.put(cacheID, cache);
 621   
 622  18 return cache;
 623    }
 624   
 625  4 protected void configureCacheLoader(Configuration c, String cacheID) throws Exception
 626    {
 627  4 String tmp_location = getTempLocation(cacheID);
 628   
 629    // Do cleanup in case it failed before
 630  4 File file = new File(tmp_location);
 631  4 cleanFile(file);
 632  4 file.mkdir();
 633  4 tmp_location = escapeWindowsPath(tmp_location);
 634  4 c.setCacheLoaderConfig(getCacheLoaderConfig("org.jboss.cache.loader.FileCacheLoader", tmp_location));
 635    }
 636   
 637   
 638  4 protected CacheLoaderConfig getCacheLoaderConfig(String cl, String loc) throws Exception
 639    {
 640  4 String xml = " <config>\n" +
 641    " \n" +
 642    " <passivation>false</passivation>\n" +
 643    " <preload></preload>\n" +
 644    "\n" +
 645    " <cacheloader>\n" +
 646    " <class>" + cl + "</class>\n" +
 647    " <properties>\n" +
 648    " location=" + loc + "\n" +
 649    " </properties>\n" +
 650    " <async>false</async>\n" +
 651    " <fetchPersistentState>true</fetchPersistentState>\n" +
 652    " <ignoreModifications>false</ignoreModifications>\n" +
 653    " </cacheloader>\n" +
 654    " \n" +
 655    " </config>";
 656  4 Element element = XmlHelper.stringToElement(xml);
 657  4 return XmlConfigurationParser.parseCacheLoaderConfig(element);
 658    }
 659   
 660  12 protected String getTempLocation(String cacheID)
 661    {
 662  12 String tmp_location = System.getProperty("java.io.tmpdir", "c:\\tmp");
 663  12 File file = new File(tmp_location);
 664  12 file = new File(file, cacheID);
 665  12 return file.getAbsolutePath();
 666    }
 667   
 668  4 protected String escapeWindowsPath(String path)
 669    {
 670  4 if ('/' == File.separatorChar)
 671  4 return path;
 672   
 673  0 char[] chars = path.toCharArray();
 674  0 StringBuffer sb = new StringBuffer();
 675  0 for (int i = 0; i < chars.length; i++)
 676    {
 677  0 if (chars[i] == '\\')
 678  0 sb.append('\\');
 679  0 sb.append(chars[i]);
 680    }
 681  0 return sb.toString();
 682    }
 683   
 684    protected abstract String getReplicationVersion();
 685   
 686  6 protected void setUp() throws Exception
 687    {
 688  6 super.setUp();
 689   
 690  6 caches = new HashMap();
 691   
 692  6 addr1 = new Address();
 693  6 addr1.setStreet("101 Oakview Dr");
 694  6 addr1.setCity("Anytown");
 695  6 addr1.setZip(11111);
 696   
 697  6 addr2 = new Address();
 698  6 addr2.setStreet("222 Happy Dr");
 699  6 addr2.setCity("Fremont");
 700  6 addr2.setZip(22222);
 701   
 702  6 joe = new Person();
 703  6 joe.setName("Joe");
 704  6 joe.setAge(TWENTY);
 705  6 joe.setAddress(addr1);
 706  6 Set skills = new HashSet();
 707  6 skills.add("TENNIS");
 708  6 skills.add("CARPENTRY");
 709  6 joe.setSkills(skills);
 710   
 711  6 jane = new Person();
 712  6 jane.setName("Jane");
 713  6 jane.setAge(TWENTYFIVE);
 714  6 jane.setAddress(addr1);
 715  6 skills = new HashSet();
 716  6 skills.add("JUJITSU");
 717  6 skills.add("MACRAME");
 718  6 jane.setSkills(skills);
 719   
 720  6 bob = new Person();
 721  6 bob.setName("Bob");
 722  6 bob.setAge(FORTY);
 723  6 bob.setAddress(addr2);
 724  6 skills = new HashSet();
 725  6 skills.add("LANGUAGES");
 726  6 skills.add("LAWN BOWLING");
 727  6 bob.setSkills(skills);
 728   
 729  6 jill = new Person();
 730  6 jill.setName("Jill");
 731  6 jill.setAge(TWENTYFIVE);
 732  6 jill.setAddress(addr2);
 733  6 skills = new HashSet();
 734  6 skills.add("FORTRAN");
 735  6 skills.add("COBOL");
 736  6 jane.setSkills(skills);
 737    }
 738   
 739  6 protected void tearDown() throws Exception
 740    {
 741  6 super.tearDown();
 742   
 743  6 Set keys = caches.keySet();
 744  6 if (!keys.isEmpty())
 745    {
 746  6 String[] cacheIDs = new String[keys.size()];
 747  6 cacheIDs = (String[]) keys.toArray(cacheIDs);
 748  6 PojoCache cache = (PojoCache) caches.get(cacheIDs[0]);
 749  6 cache.getCache().removeNode(new Fqn("/"));
 750  4 Thread.sleep(200);
 751   
 752  4 for (int i = 0; i < cacheIDs.length; i++)
 753    {
 754  8 stopCache((PojoCache) caches.get(cacheIDs[i]));
 755  8 File file = new File(getTempLocation(cacheIDs[i]));
 756  8 cleanFile(file);
 757    }
 758    }
 759    }
 760   
 761  8 protected void stopCache(PojoCache cache)
 762    {
 763  8 if (cache != null)
 764    {
 765  8 try
 766    {
 767  8 cache.stop();
 768  8 cache.destroy();
 769    }
 770    catch (Exception e)
 771    {
 772  0 log.error("Exception stopping cache " + e.getMessage(), e);
 773    }
 774    }
 775    }
 776   
 777  300 protected void cleanFile(File file)
 778    {
 779  300 File[] children = file.listFiles();
 780  300 if (children != null)
 781    {
 782  163 for (int i = 0; i < children.length; i++)
 783    {
 784  288 cleanFile(children[i]);
 785    }
 786    }
 787   
 788  300 if (file.exists())
 789  292 file.delete();
 790  300 if (file.exists())
 791  0 file.deleteOnExit();
 792    }
 793   
 794    private class CacheActivator extends CacheUser
 795    {
 796   
 797  10 CacheActivator(Semaphore semaphore,
 798    String name,
 799    boolean sync)
 800    throws Exception
 801    {
 802  10 super(semaphore, name, sync, false);
 803    }
 804   
 805  5 void useCache() throws Exception
 806    {
 807  5 Region region = cache.getCache().getRegion(Fqn.fromString("/a/b"), true);
 808  5 region.registerContextClassLoader(getClass().getClassLoader());
 809  5 region.activate();
 810  1 log.info("TEST: " + name + " activated region" + " " + System.currentTimeMillis());
 811  1 String childFqn = "/a/b/" + name;
 812   
 813  1 Person p = new Person();
 814  1 p.setName("Person " + name);
 815   
 816  1 Address addr = new Address();
 817  1 addr.setStreet(name + " Test Street");
 818  1 addr.setCity(name + ", CA");
 819  1 p.setAddress(addr);
 820   
 821  1 TestingUtil.sleepThread(1);
 822   
 823    // tm.begin();
 824    // try
 825    // {
 826  1 cache.attach(childFqn, p);
 827  1 log.info("TEST: " + name + " put fqn " + childFqn + " " + System.currentTimeMillis());
 828    // }
 829    // catch (Exception e)
 830    // {
 831    // tm.setRollbackOnly();
 832    // throw e;
 833    // }
 834    // finally
 835    // {
 836    // tm.commit();
 837    // }
 838   
 839    }
 840   
 841  0 public Object getCacheValue(String fqn) throws CacheException
 842    {
 843  0 return cache.find(fqn);
 844    }
 845    }
 846   
 847    private class CacheStressor extends CacheUser
 848    {
 849    private Random random;
 850    private boolean putsStopped = false;
 851    private boolean stopped = false;
 852   
 853  0 CacheStressor(Semaphore semaphore,
 854    String name,
 855    boolean sync)
 856    throws Exception
 857    {
 858  0 super(semaphore, name, sync, true);
 859   
 860  0 random = new Random(System.currentTimeMillis() + name.hashCode());
 861    }
 862   
 863  0 void useCache() throws Exception
 864    {
 865    // Do lots of puts into the cache. Use our own nodes,
 866    // as we're not testing conflicts between writer nodes,
 867    // just whether activation causes problems
 868  0 int factor = 0;
 869  0 int i = 0;
 870  0 String fqn = null;
 871   
 872  0 Address addr1 = new Address();
 873  0 addr1.setStreet("1 Test Street");
 874  0 addr1.setCity("TestOne, CA");
 875   
 876  0 Address addr2 = new Address();
 877  0 addr2.setStreet("2 Test Street");
 878  0 addr2.setCity("TestTwo, CA");
 879   
 880  0 Person[] people = new Person[SUBTREE_SIZE];
 881  0 boolean[] loaded = new boolean[SUBTREE_SIZE];
 882  0 for (int j = 0; j < SUBTREE_SIZE; j++)
 883    {
 884  0 Person p = new Person();
 885  0 p.setName("Person " + j);
 886  0 p.setAge(j);
 887  0 p.setAddress((j % 2 == 0) ? addr1 : addr2);
 888  0 people[j] = p;
 889    }
 890   
 891  0 boolean acquired = true;
 892  0 try
 893    {
 894  0 while (!stopped)
 895    {
 896  0 if (i > 0)
 897    {
 898  0 acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
 899  0 if (!acquired)
 900  0 throw new Exception(name + " cannot acquire semaphore");
 901  0 log.info("TEST: " + name + " reacquired semaphore");
 902  0 System.out.println("TEST: " + name + " reacquired semaphore");
 903    }
 904   
 905  0 int lastIndex = -1;
 906  0 int index = -1;
 907  0 while (!putsStopped)
 908    {
 909    // Ensure we don't operate on the same address twice in a row
 910    // otherwise deadlock detection sometimes causes
 911    // the _put for the second call to precede the commit
 912    // for the first, leading to deadlock. This seems like a
 913    // JGroups bug, but the purpose of this test isn't to expose it
 914  0 while (index % 2 == lastIndex % 2)
 915    {
 916  0 factor = random.nextInt(50);
 917  0 index = factor % SUBTREE_SIZE;
 918    }
 919   
 920  0 lastIndex = index;
 921   
 922  0 TestingUtil.sleepThread(factor);
 923   
 924  0 fqn = "/" + name + "/" + String.valueOf(index);
 925   
 926    // tm.begin();
 927    // try
 928    // {
 929  0 if (loaded[index] == false)
 930    {
 931  0 cache.attach(fqn, people[index]);
 932  0 loaded[index] = true;
 933  0 log.info("TEST: " + name + " put Person at " + fqn);
 934    }
 935  0 else if (i % 2 == 0)
 936    {
 937  0 int newAge = factor / SUBTREE_SIZE;
 938  0 people[index].setAge(newAge);
 939    }
 940    else
 941    {
 942  0 people[index].getAddress().setStreet(factor + " Test Street");
 943    }
 944    // }
 945    // catch (Exception e)
 946    // {
 947    // tm.setRollbackOnly();
 948    // throw e;
 949    // }
 950    // finally
 951    // {
 952    // tm.commit();
 953    // }
 954   
 955  0 i++;
 956    }
 957   
 958  0 log.info("TEST: " + name + ": last put [#" + i + "] -- " + fqn + " = " + (factor / SUBTREE_SIZE));
 959   
 960  0 semaphore.release();
 961  0 acquired = false;
 962   
 963    // Go to sleep until directed otherwise
 964  0 while (!stopped && putsStopped)
 965  0 TestingUtil.sleepThread(100);
 966    }
 967    }
 968    finally
 969    {
 970  0 if (acquired)
 971  0 semaphore.release();
 972    }
 973    }
 974   
 975    // public void start() throws Exception
 976    // {
 977    // super.start();
 978    // cache.activateRegion("/" + name);
 979    // }
 980   
 981  0 public void stopPuts()
 982    {
 983  0 putsStopped = true;
 984  0 log.info("TEST: " + name + " putsStopped");
 985    }
 986   
 987  0 public void startPuts()
 988    {
 989  0 putsStopped = false;
 990    }
 991   
 992  0 public void stopThread()
 993    {
 994  0 stopped = true;
 995  0 if (thread.isAlive())
 996  0 thread.interrupt();
 997    }
 998   
 999   
 1000    }
 1001   
 1002    private abstract class CacheUser implements Runnable
 1003    {
 1004    protected Semaphore semaphore;
 1005    protected PojoCache cache;
 1006    protected TransactionManager tm;
 1007    protected String name;
 1008    protected Exception exception;
 1009    protected Thread thread;
 1010   
 1011  10 CacheUser(Semaphore semaphore,
 1012    String name,
 1013    boolean sync,
 1014    boolean activateRoot)
 1015    throws Exception
 1016    {
 1017  10 this.cache = createCache(name, sync, true, false, !activateRoot);
 1018  10 tm = ((CacheSPI) cache.getCache()).getTransactionManager();
 1019  10 if (tm == null)
 1020  0 throw new IllegalStateException("TransactionManager required");
 1021  10 this.semaphore = semaphore;
 1022  10 this.name = name;
 1023   
 1024  10 log.info("TEST: Cache " + name + " started");
 1025  10 System.out.println("TEST: Cache " + name + " started");
 1026    }
 1027   
 1028  10 public void run()
 1029    {
 1030  10 log.info("TEST: " + name + " started");
 1031  10 System.out.println("TEST: " + name + " started");
 1032   
 1033  10 boolean acquired = false;
 1034  10 try
 1035    {
 1036  10 acquired = semaphore.tryAcquire(60, TimeUnit.SECONDS);
 1037  7 if (!acquired)
 1038  2 throw new Exception(name + " cannot acquire semaphore");
 1039  5 log.info("TEST: " + name + " acquired semaphore");
 1040  5 System.out.println("TEST: " + name + " acquired semaphore");
 1041  5 useCache();
 1042   
 1043    }
 1044    catch (Exception e)
 1045    {
 1046  5 log.error("TEST: " + name + ": " + e.getLocalizedMessage(), e);
 1047   
 1048    // Save it for the test to check
 1049  5 exception = e;
 1050    }
 1051    finally
 1052    {
 1053  6 if (acquired)
 1054  1 semaphore.release();
 1055    }
 1056   
 1057    }
 1058   
 1059    abstract void useCache() throws Exception;
 1060   
 1061  0 public Exception getException()
 1062    {
 1063  0 return exception;
 1064    }
 1065   
 1066  10 public Cache getCache()
 1067    {
 1068  10 return cache.getCache();
 1069    }
 1070   
 1071  10 public void start() throws Exception
 1072    {
 1073  10 thread = new Thread(this);
 1074  10 thread.start();
 1075    }
 1076   
 1077  10 public void cleanup()
 1078    {
 1079  10 if (thread != null && thread.isAlive())
 1080  8 thread.interrupt();
 1081    }
 1082    }
 1083    }