Clover coverage report -
Coverage timestamp: Wed Jan 31 2007 15:38:53 EST
file stats: LOC: 787   Methods: 46
NCLOC: 434   Classes: 8
 
 Source file Conditionals Statements Methods TOTAL
ForcedStateTransferTest.java 78.9% 90.7% 97.8% 90.2%
coverage coverage
 1    /*
 2    * JBoss, Home of Professional Open Source
 3    * Copyright 2005, JBoss Inc., and individual contributors as indicated
 4    * by the @authors tag. See the copyright.txt in the distribution for a
 5    * full listing of individual contributors.
 6    *
 7    * This is free software; you can redistribute it and/or modify it
 8    * under the terms of the GNU Lesser General Public License as
 9    * published by the Free Software Foundation; either version 2.1 of
 10    * the License, or (at your option) any later version.
 11    *
 12    * This software is distributed in the hope that it will be useful,
 13    * but WITHOUT ANY WARRANTY; without even the implied warranty of
 14    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 15    * Lesser General Public License for more details.
 16    *
 17    * You should have received a copy of the GNU Lesser General Public
 18    * License along with this software; if not, write to the Free
 19    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 20    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 21    */
 22   
 23    package org.jboss.cache.statetransfer;
 24   
 25    import org.jboss.cache.AbstractCacheListener;
 26    import org.jboss.cache.CacheException;
 27    import org.jboss.cache.CacheSPI;
 28    import org.jboss.cache.Fqn;
 29    import org.jboss.cache.Version;
 30    import org.jboss.cache.misc.TestingUtil;
 31   
 32    import javax.transaction.Synchronization;
 33    import javax.transaction.Transaction;
 34    import javax.transaction.TransactionManager;
 35    import java.util.Map;
 36   
 37   
 38    /**
 39    * Tests the ability to force a state transfer in the presence of
 40    * transactional and non-transactional threads that are hung holding
 41    * locks in the cache.
 42    *
 43    * @author Brian Stansberry
 44    * @version $Revision$
 45    */
 46    public class ForcedStateTransferTest extends StateTransferTestBase
 47    {
 48    /**
 49    * Starts a cache in a separate thread, allowing the main thread
 50    * to abort if state transfer is taking too long.
 51    */
 52    static class CacheStarter extends Thread
 53    {
 54    CacheSPI cache;
 55    boolean useMarshalling;
 56    Exception failure;
 57   
 58  8 CacheStarter(CacheSPI cache, boolean useMarshalling)
 59    {
 60  8 this.cache = cache;
 61  8 this.useMarshalling = useMarshalling;
 62    }
 63   
 64  8 public void run()
 65    {
 66  8 try
 67    {
 68  8 cache.start();
 69   
 70  1 if (useMarshalling)
 71    {
 72    // If we don't do initial state transfer, there is
 73    // no guarantee of start() blocking until the view is received
 74    // so we need to do it ourself
 75  1 TestingUtil.blockUntilViewReceived(cache, 2, 60000);
 76  1 cache.getRegion(Fqn.ROOT, true).activate();
 77    }
 78    }
 79    catch (Exception e)
 80    {
 81  7 failure = e;
 82    }
 83    }
 84    }
 85   
 86    /**
 87    * Generic superclass of classes that perform some operation on the
 88    * cache that is intended to hang with a lock held on certain nodes.
 89    */
 90    static abstract class TaskRunner extends Thread
 91    {
 92    CacheSPI cache;
 93    Fqn fqn;
 94    String value;
 95    Exception failure;
 96    boolean asleep = false;
 97   
 98  62 TaskRunner(CacheSPI cache, String rootFqn, String value)
 99    {
 100  62 this.cache = cache;
 101  62 this.value = value;
 102  62 this.fqn = new Fqn(Fqn.fromString(rootFqn), value);
 103    }
 104   
 105  62 public void run()
 106    {
 107  62 try
 108    {
 109    // do whatever the task is
 110  62 executeTask();
 111    }
 112    catch (Exception e)
 113    {
 114  4 if (!isDone())
 115  1 failure = e;
 116    }
 117    finally
 118    {
 119  25 asleep = false;
 120    // hook to allow final processing
 121  25 finalCleanup();
 122    }
 123    }
 124   
 125    abstract void executeTask() throws Exception;
 126   
 127    abstract boolean isDone();
 128   
 129  10 void finalCleanup()
 130    {
 131    }
 132   
 133  665983 boolean isAsleep()
 134    {
 135  665983 return asleep;
 136    }
 137    }
 138   
 139    /**
 140    * Hangs with an active or rollback-only transaction holding locks.
 141    */
 142    static class TxRunner extends TaskRunner
 143    {
 144    Transaction tx = null;
 145    boolean rollback = false;
 146    boolean done = true;
 147   
 148  24 TxRunner(CacheSPI cache, String rootFqn, String value, boolean rollback)
 149    {
 150  24 super(cache, rootFqn, value);
 151  24 this.rollback = rollback;
 152    }
 153   
 154  24 void executeTask() throws Exception
 155    {
 156  24 TransactionManager tm = cache.getTransactionManager();
 157  24 tm.begin();
 158  24 tx = tm.getTransaction();
 159   
 160  24 cache.put(fqn, "KEY", value);
 161   
 162  24 if (rollback)
 163  12 tx.setRollbackOnly();
 164   
 165  24 asleep = true;
 166  24 TestingUtil.sleepThread((long) 25000);
 167  15 done = true;
 168    }
 169   
 170  15 void finalCleanup()
 171    {
 172  15 if (tx != null)
 173    {
 174  15 try { tx.commit(); } catch (Exception ignore) {}
 175    }
 176    }
 177   
 178  0 boolean isDone()
 179    {
 180  0 return done;
 181    }
 182    }
 183   
 184    /**
 185    * TreeCacheListener that hangs the thread in nodeModified().
 186    */
 187    static class HangThreadListener extends AbstractCacheListener
 188    {
 189    boolean asleep;
 190    Fqn toHang;
 191    boolean alreadyHung;
 192    boolean done;
 193   
 194  12 HangThreadListener(Fqn toHang)
 195    {
 196  12 this.toHang = toHang;
 197    }
 198   
 199  48 public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map data)
 200    {
 201  24 if (!pre) hangThread(fqn);
 202    }
 203   
 204  24 private void hangThread(Fqn fqn)
 205    {
 206  24 if (!alreadyHung && toHang.equals(fqn))
 207    {
 208  12 asleep = true;
 209    //System.out.println("Hanging thread changing node " + fqn);
 210  12 alreadyHung = true;
 211  12 TestingUtil.sleepThread((long) 30000);
 212  3 done = true;
 213  3 asleep = false;
 214    }
 215    }
 216    }
 217   
 218    /**
 219    * Hangs with a non-transactional thread holding locks.
 220    */
 221    static class HangThreadRunner extends TaskRunner
 222    {
 223    HangThreadListener listener;
 224   
 225  12 HangThreadRunner(CacheSPI cache, String rootFqn, String value)
 226    {
 227  12 super(cache, rootFqn, value);
 228  12 listener = new HangThreadListener(fqn);
 229  12 cache.addCacheListener(listener);
 230    }
 231   
 232  12 void executeTask() throws Exception
 233    {
 234    // Just do a put and the listener will hang the thread
 235  12 cache.put(fqn, "KEY", value);
 236    }
 237   
 238  268907 boolean isAsleep()
 239    {
 240  268907 return listener.asleep;
 241    }
 242   
 243  3 boolean isDone()
 244    {
 245  3 return listener.done;
 246    }
 247    }
 248   
 249    /**
 250    * Synchronization that hangs the thread either in
 251    * beforeCompletion() or afterCompletion().
 252    */
 253    static class HangThreadSynchronization implements Synchronization
 254    {
 255    boolean asleep;
 256    boolean hangBefore;
 257    boolean done;
 258   
 259  26 HangThreadSynchronization(boolean hangBefore)
 260    {
 261  26 this.hangBefore = hangBefore;
 262    }
 263   
 264  25 public void beforeCompletion()
 265    {
 266  25 if (hangBefore)
 267    {
 268  12 hang();
 269    }
 270    }
 271   
 272  16 public void afterCompletion(int status)
 273    {
 274  16 if (!hangBefore)
 275    {
 276  13 hang();
 277    }
 278    }
 279   
 280  25 void hang()
 281    {
 282  25 asleep = true;
 283  25 TestingUtil.sleepThread((long) 30000);
 284  6 done = true;
 285    }
 286   
 287    }
 288   
 289    /**
 290    * Hangs with a transactional thread either in the beforeCompletion()
 291    * or afterCompletion() phase holding locks.
 292    */
 293    static class SynchronizationTxRunner extends TaskRunner
 294    {
 295    Transaction tx = null;
 296    HangThreadSynchronization sync;
 297   
 298  26 SynchronizationTxRunner(CacheSPI cache, String rootFqn, String value, boolean hangBefore)
 299    {
 300  26 super(cache, rootFqn, value);
 301  26 this.sync = new HangThreadSynchronization(hangBefore);
 302    }
 303   
 304  26 void executeTask() throws Exception
 305    {
 306  26 TransactionManager tm = cache.getTransactionManager();
 307  26 tm.begin();
 308  26 tx = tm.getTransaction();
 309  26 tx.registerSynchronization(sync);
 310   
 311  26 cache.put(fqn, "KEY", value);
 312   
 313    // Committing the tx will hang the thread
 314  25 tx.commit();
 315    }
 316   
 317  725687 boolean isAsleep()
 318    {
 319  725687 return sync.asleep;
 320    }
 321   
 322  1 boolean isDone()
 323    {
 324  1 return sync.done;
 325    }
 326    }
 327   
 328    /**
 329    * Tests the ability to force a state transfer in the presence
 330    * of active transactions on the sending cache.
 331    *
 332    * @throws Exception
 333    */
 334  1 public void testActiveTransaction() throws Exception
 335    {
 336  1 String[] values = {"A", "B", "C"};
 337  1 transactionTest(values, false, "REPEATABLE_READ");
 338    }
 339   
 340    /**
 341    * Tests the ability to force a state transfer in the presence
 342    * of a transaction marked rollback-only on the sending cache.
 343    *
 344    * @throws Exception
 345    */
 346  1 public void testRollbackOnlyTransaction() throws Exception
 347    {
 348  1 String[] values = {"A", "B", "C"};
 349  1 transactionTest(values, true, "REPEATABLE_READ");
 350    }
 351   
 352    /**
 353    * Run a basic test with transactional threads doing puts and then
 354    * hanging before committing.
 355    *
 356    * @param values node names under which puts should be done
 357    * @param rollback should the transactions be marked rollback-only
 358    * before hanging
 359    * @param isolationLevel cache's isolation level
 360    * @throws Exception
 361    */
 362  2 private void transactionTest(String[] values,
 363    boolean rollback,
 364    String isolationLevel) throws Exception
 365    {
 366    // Create the cache from which state will be requested
 367  2 CacheSPI sender = initializeSender(isolationLevel, false, false);
 368   
 369    // Start threads that will do operations on the cache and then hang
 370  2 TxRunner[] runners =
 371    initializeTransactionRunners(values, sender, "/LOCK", rollback);
 372   
 373    // Create and start the cache that requests a state transfer
 374  2 CacheSPI receiver = startReceiver(isolationLevel, false, false);
 375   
 376    // Confirm the receiver got the expected state and the threads are OK
 377  0 checkResults(receiver, runners, false);
 378    }
 379   
 380    /**
 381    * Creates and starts a CacheSPI from which another cache will request
 382    * state. Also adds value "X" under key "KEY" in node "/OK". This node
 383    * should be present in the transferred state in any test.
 384    *
 385    * @param isolationLevel cache's isolation level
 386    * @param replSync is cache REPL_SYNC?
 387    * @param useMarshalling is the activateRegion() API to be used?
 388    * @return the cache
 389    * @throws Exception
 390    */
 391  9 private CacheSPI initializeSender(String isolationLevel,
 392    boolean replSync,
 393    boolean useMarshalling) throws Exception
 394    {
 395  9 CacheSPI sender = createCache("sender", isolationLevel, replSync, useMarshalling, true);
 396   
 397  9 if (useMarshalling)
 398  1 sender.getRegion(Fqn.ROOT, true).activate();
 399   
 400  9 sender.put(Fqn.fromString("/OK"), "KEY", "X");
 401   
 402  9 return sender;
 403    }
 404   
 405    /**
 406    * Start a set of TaskRunner threads that do a transactional put on the cache
 407    * and then go to sleep with the transaction uncommitted.
 408    *
 409    * @param values the name of the node that should be put under
 410    * rootFqn, and the value that shoud be put in its map
 411    * @param sender the cache on which the put should be done
 412    * @param rootFqn Fqn under which the new node should be inserted -- the
 413    * Fqn of the new node will be /rootFqn/value
 414    * @param rollback <code>true</code> if the tx should be marked
 415    * rollback-only before the thread goes to sleep
 416    * @return the TaskRunner threads
 417    */
 418  8 private TxRunner[] initializeTransactionRunners(String[] values,
 419    CacheSPI sender,
 420    String rootFqn,
 421    boolean rollback)
 422    {
 423  8 TxRunner[] runners = new TxRunner[values.length];
 424  8 for (int i = 0; i < values.length; i++)
 425    {
 426  24 runners[i] = new TxRunner(sender, rootFqn, values[i], rollback);
 427  24 initializeRunner(runners[i]);
 428    }
 429   
 430  8 return runners;
 431    }
 432   
 433    /**
 434    * Starts the runner and waits up to 1 second until it is asleep, confirming
 435    * that it is alive.
 436    *
 437    * @param runner
 438    */
 439  62 private void initializeRunner(TaskRunner runner)
 440    {
 441  62 runner.start();
 442   
 443    // Loop until it executes its put and goes to sleep (i.e. hangs)
 444  62 long start = System.currentTimeMillis();
 445  62 while (!(runner.isAsleep()))
 446    {
 447  1660516 assertTrue(runner.getClass().getName() + " " + runner.value +
 448    " is alive", runner.isAlive());
 449    // Avoid hanging test fixture by only waiting 1 sec before failing
 450  1660516 assertFalse(runner.getClass().getName() + " " + runner.value +
 451    " has not timed out",
 452    (System.currentTimeMillis() - start) > 1000);
 453    }
 454    }
 455   
 456    /**
 457    * Checks whether the receiver cache has the expected state and whether
 458    * the runners ran cleanly. Also terminates the runners.
 459    *
 460    * @param receiver the cache that received state
 461    * @param runners the task runners
 462    * @param allowValues true if the runners' values are expected to
 463    * be in the cache state; false otherwise
 464    * @throws CacheException
 465    */
 466  1 private void checkResults(CacheSPI receiver,
 467    TaskRunner[] runners,
 468    boolean allowValues) throws CacheException
 469    {
 470    // Check that the runners are alive and kill them
 471  1 boolean[] aliveStates = new boolean[runners.length];
 472  1 for (int i = 0; i < runners.length; i++)
 473    {
 474  3 aliveStates[i] = runners[i].isAlive();
 475  3 if (aliveStates[i])
 476  3 runners[i].interrupt();
 477    }
 478   
 479    // Confirm we got the "non-hung" state
 480  1 assertEquals("OK value correct", "X", receiver.get(Fqn.fromString("/OK"), "KEY"));
 481   
 482  0 for (int i = 0; i < runners.length; i++)
 483    {
 484  0 assertTrue("Runner " + runners[i].value + " was alive", aliveStates[i]);
 485  0 assertNull("Runner " + runners[i].value + " ran cleanly", runners[i].failure);
 486  0 if (allowValues)
 487    {
 488  0 assertEquals("Correct value in " + runners[i].fqn,
 489    runners[i].value, receiver.get(runners[i].fqn, "KEY"));
 490    }
 491    else
 492    {
 493  0 assertNull("No value in " + runners[i].fqn,
 494    receiver.get(runners[i].fqn, "KEY"));
 495    }
 496    }
 497    }
 498   
 499    /**
 500    * Tests the ability to force a state transfer in the presence of
 501    * a hung thread holding a lock on the sending cache.
 502    *
 503    * @throws Exception
 504    */
 505  1 public void testHungThread() throws Exception
 506    {
 507    // Create the cache from which state will be requested
 508  1 CacheSPI sender = initializeSender("REPEATABLE_READ", false, false);
 509   
 510    // Start threads that will do operations on the cache and then hang
 511  1 String[] values = {"A", "B", "C"};
 512  1 HangThreadRunner[] runners = initializeHangThreadRunners(values, sender, "/LOCK");
 513   
 514    // Create and start the cache that requests a state transfer
 515  1 CacheSPI receiver = startReceiver("REPEATABLE_READ", false, false);
 516   
 517    // Confirm the receiver got the expected state and the threads are OK
 518  0 checkResults(receiver, runners, true);
 519    }
 520   
 521    /**
 522    * Start a set of TaskRunner threads that do a non-transactional put on the
 523    * cache and then go to sleep with the thread hung in a
 524    * TreeCacheListener and locks unreleased
 525    *
 526    * @param values the name of the node that should be put under
 527    * rootFqn, and the value that shoud be put in its map
 528    * @param sender the cache on which the put should be done
 529    * @param rootFqn Fqn under which the new node should be inserted -- the
 530    * Fqn of the new node will be /rootFqn/value
 531    * @return the TaskRunner threads
 532    */
 533  4 private HangThreadRunner[] initializeHangThreadRunners(String[] values,
 534    CacheSPI sender,
 535    String rootFqn)
 536    {
 537  4 HangThreadRunner[] runners = new HangThreadRunner[values.length];
 538  4 for (int i = 0; i < values.length; i++)
 539    {
 540  12 runners[i] = new HangThreadRunner(sender, rootFqn, values[i]);
 541  12 initializeRunner(runners[i]);
 542    }
 543   
 544  4 return runners;
 545    }
 546   
 547    /**
 548    * Tests the ability to force a state transfer in the presence
 549    * of a transaction that is hung in a
 550    * Synchronization.beforeCompletion() call.
 551    *
 552    * @throws Exception
 553    */
 554  1 public void testBeforeCompletionLock() throws Exception
 555    {
 556  1 synchronizationTest(true);
 557    }
 558   
 559    /**
 560    * Tests the ability to force a state transfer in the presence
 561    * of a transaction that is hung in a
 562    * Synchronization.beforeCompletion() call.
 563    *
 564    * @throws Exception
 565    */
 566  1 public void testAfterCompletionLock() throws Exception
 567    {
 568  1 synchronizationTest(false);
 569    }
 570   
 571    /**
 572    * Tests the ability to force a state transfer in the presence
 573    * of a transaction that is hung either in a
 574    * Synchronization.beforeCompletion() or Synchronization.afterCompletion()
 575    * call.
 576    *
 577    * @param hangBefore <code>true</code> if the thread should hang in
 578    * <code>beforeCompletion()</code>, <code>false</code>
 579    * if it should hang in <code>afterCompletion</code>
 580    * @throws Exception
 581    */
 582  2 private void synchronizationTest(boolean hangBefore) throws Exception
 583    {
 584  2 CacheSPI sender = initializeSender("REPEATABLE_READ", false, false);
 585   
 586  2 String[] values = {"A", "B", "C"};
 587  2 SynchronizationTxRunner[] runners =
 588    initializeSynchronizationTxRunners(values, sender, "/LOCK", hangBefore);
 589   
 590  2 CacheSPI receiver = startReceiver("REPEATABLE_READ", false, false);
 591   
 592  0 checkResults(receiver, runners, !hangBefore);
 593    }
 594   
 595   
 596    /**
 597    * Start a set of TaskRunner threads that do a transactional put on the
 598    * cache and then go to sleep with the thread hung in a
 599    * transaction Synchronization call and locks unreleased
 600    *
 601    * @param values the name of the node that should be put under
 602    * rootFqn, and the value that shoud be put in its map
 603    * @param sender the cache on which the put should be done
 604    * @param rootFqn Fqn under which the new node should be inserted -- the
 605    * Fqn of the new node will be /rootFqn/value
 606    * @param hangBefore <code>true</code> if the thread should hang in
 607    * <code>beforeCompletion()</code>, <code>false</code>
 608    * if it should hang in <code>afterCompletion</code>
 609    * @return the TaskRunner threads
 610    */
 611  9 private SynchronizationTxRunner[] initializeSynchronizationTxRunners(String[] values,
 612    CacheSPI sender,
 613    String rootFqn,
 614    boolean hangBefore)
 615    {
 616  9 SynchronizationTxRunner[] runners =
 617    new SynchronizationTxRunner[values.length];
 618  9 for (int i = 0; i < values.length; i++)
 619    {
 620  26 runners[i] = new SynchronizationTxRunner(sender, rootFqn, values[i], hangBefore);
 621  26 initializeRunner(runners[i]);
 622    }
 623  8 return runners;
 624    }
 625   
 626    /**
 627    * Tests the ability to force a state transfer in the presence
 628    * of multiple issues on the sending cache (active transactions,
 629    * rollback-only transactions, transactions hung in beforeCompletion() and
 630    * afterCompletion() calls, as well as hung threads).
 631    *
 632    * @throws Exception
 633    */
 634  1 public void testMultipleProblems() throws Exception
 635    {
 636  1 multipleProblemTest("REPEATABLE_READ", "/LOCK", false, false);
 637    }
 638   
 639    /**
 640    * Tests the ability to force a state transfer in the presence
 641    * of an active transaction in the sending cache
 642    * and isolation level SERIALIZABLE.
 643    *
 644    * @throws Exception
 645    */
 646  1 public void testSerializableIsolation() throws Exception
 647    {
 648  1 multipleProblemTest("SERIALIZABLE", "/", false, false);
 649    }
 650   
 651    /**
 652    * Tests the ability to force a partial state transfer with multiple
 653    * "problem" actors holding locks on the sending node. Same test as
 654    * {@link #testMultipleProblems()} except the partial state transfer API is
 655    * used instead of an initial state transfer.
 656    *
 657    * @throws Exception
 658    */
 659  1 public void testPartialStateTransfer() throws Exception
 660    {
 661  1 multipleProblemTest("REPEATABLE_READ", "/LOCK", false, true);
 662    }
 663   
 664    /**
 665    * Tests the ability to force a partial state transfer with multiple
 666    * "problem" actors holding locks on the sending node and cache mode
 667    * REPL_SYNC. Same test as {@link #testMultipleProblems()} except the
 668    * cache is configured for REPL_SYNC.
 669    *
 670    * @throws Exception
 671    */
 672  1 public void testReplSync() throws Exception
 673    {
 674  1 multipleProblemTest("REPEATABLE_READ", "/LOCK", true, false);
 675    }
 676   
 677    /**
 678    * Tests the ability to force a partial state transfer with multiple
 679    * "problem" actors holding locks on the sending node.
 680    *
 681    * @throws Exception
 682    */
 683  4 private void multipleProblemTest(String isolationLevel,
 684    String rootFqn,
 685    boolean replSync,
 686    boolean useMarshalling) throws Exception
 687    {
 688  4 CacheSPI sender = initializeSender(isolationLevel, replSync, useMarshalling);
 689   
 690    // Do the "after" nodes first, otherwise if there is a /LOCK parent
 691    // node, the rollback of a tx will remove it causing the test to fail
 692    // since the child node created by it will be gone as well.
 693    // This is really a REPEATABLE_READ bug that this test isn't intended
 694    // to catch; will create a separate locking test that shows it
 695  4 String[] val1 = {"A", "B", "C"};
 696  4 SynchronizationTxRunner[] after =
 697    initializeSynchronizationTxRunners(val1, sender, rootFqn, false);
 698   
 699  3 String[] val2 = {"D", "E", "F"};
 700  3 SynchronizationTxRunner[] before =
 701    initializeSynchronizationTxRunners(val2, sender, rootFqn, true);
 702   
 703  3 String[] val3 = {"G", "H", "I"};
 704  3 TxRunner[] active =
 705    initializeTransactionRunners(val3, sender, rootFqn, false);
 706   
 707  3 String[] val4 = {"J", "K", "L"};
 708  3 TxRunner[] rollback =
 709    initializeTransactionRunners(val4, sender, rootFqn, true);
 710   
 711  3 String[] val5 = {"M", "N", "O"};
 712  3 HangThreadRunner[] threads =
 713    initializeHangThreadRunners(val5, sender, rootFqn);
 714   
 715  3 CacheSPI receiver = startReceiver(isolationLevel, replSync, useMarshalling);
 716   
 717  1 checkResults(receiver, active, false);
 718  0 checkResults(receiver, rollback, false);
 719  0 checkResults(receiver, before, false);
 720  0 checkResults(receiver, after, true);
 721  0 checkResults(receiver, threads, true);
 722    }
 723   
 724  17 protected String getReplicationVersion()
 725    {
 726  17 return Version.version;
 727    }
 728   
 729    /**
 730    * Starts a cache that requests state from another cache. Confirms
 731    * that the receiver cache starts properly.
 732    *
 733    * @param isolationLevel
 734    * @param replSync
 735    * @param useMarshalling
 736    * @return the receiver cache
 737    * @throws Exception
 738    */
 739  8 private CacheSPI startReceiver(String isolationLevel,
 740    boolean replSync,
 741    boolean useMarshalling) throws Exception
 742    {
 743  8 CacheSPI receiver = createCache("receiver", isolationLevel, replSync, useMarshalling, false);
 744   
 745    // Start the cache in a separate thread so we can kill the
 746    // thread if the cache doesn't start properly
 747  8 CacheStarter starter = new CacheStarter(receiver, useMarshalling);
 748   
 749  8 starter.start();
 750   
 751  8 starter.join(20000);
 752   
 753  8 boolean alive = starter.isAlive();
 754  8 if (alive)
 755  0 starter.interrupt();
 756  8 assertFalse("Starter finished", alive);
 757   
 758  8 assertNull("No exceptions in starter", starter.failure);
 759   
 760  1 return receiver;
 761    }
 762   
 763    /**
 764    * Override the superclass version to set an unlimited state transfer timeout
 765    * and a 1 sec lock acquisition timeout.
 766    */
 767  17 private CacheSPI createCache(String cacheID,
 768    String isolationLevel,
 769    boolean replSync,
 770    boolean useMarshalling,
 771    boolean startCache)
 772    throws Exception
 773    {
 774  17 CacheSPI result = super.createCache(cacheID, replSync,
 775    useMarshalling, false, false, false);
 776  17 result.getConfiguration().setInitialStateRetrievalTimeout(0);
 777  17 result.getConfiguration().setLockAcquisitionTimeout(1000);
 778  17 result.getConfiguration().setIsolationLevel(isolationLevel);
 779   
 780  17 if (startCache)
 781  9 result.start();
 782   
 783  17 return result;
 784    }
 785   
 786   
 787    }