Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 796   Methods: 46
NCLOC: 443   Classes: 8
 
 Source file Conditionals Statements Methods TOTAL
ForcedStateTransferTest.java 52.6% 74.7% 84.8% 73.2%
coverage coverage
 1    /*
 2    * JBoss, Home of Professional Open Source
 3    * Copyright 2005, JBoss Inc., and individual contributors as indicated
 4    * by the @authors tag. See the copyright.txt in the distribution for a
 5    * full listing of individual contributors.
 6    *
 7    * This is free software; you can redistribute it and/or modify it
 8    * under the terms of the GNU Lesser General Public License as
 9    * published by the Free Software Foundation; either version 2.1 of
 10    * the License, or (at your option) any later version.
 11    *
 12    * This software is distributed in the hope that it will be useful,
 13    * but WITHOUT ANY WARRANTY; without even the implied warranty of
 14    * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
 15    * Lesser General Public License for more details.
 16    *
 17    * You should have received a copy of the GNU Lesser General Public
 18    * License along with this software; if not, write to the Free
 19    * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
 20    * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
 21    */
 22   
 23    package org.jboss.cache.statetransfer;
 24   
 25    import org.jboss.cache.CacheException;
 26    import org.jboss.cache.CacheSPI;
 27    import org.jboss.cache.Fqn;
 28    import org.jboss.cache.Version;
 29    import org.jboss.cache.misc.TestingUtil;
 30    import org.jboss.cache.notifications.annotation.CacheListener;
 31    import org.jboss.cache.notifications.annotation.NodeModified;
 32    import org.jboss.cache.notifications.event.NodeEvent;
 33   
 34    import javax.transaction.Synchronization;
 35    import javax.transaction.Transaction;
 36    import javax.transaction.TransactionManager;
 37   
 38   
 39    /**
 40    * Tests the ability to force a state transfer in the presence of
 41    * transactional and non-transactional threads that are hung holding
 42    * locks in the cache.
 43    *
 44    * @author Brian Stansberry
 45    * @version $Revision$
 46    */
 47    public class ForcedStateTransferTest extends StateTransferTestBase
 48    {
 49    /**
 50    * Starts a cache in a separate thread, allowing the main thread
 51    * to abort if state transfer is taking too long.
 52    */
 53    static class CacheStarter extends Thread
 54    {
 55    CacheSPI cache;
 56    boolean useMarshalling;
 57    Exception failure;
 58   
 59  4 CacheStarter(CacheSPI cache, boolean useMarshalling)
 60    {
 61  4 this.cache = cache;
 62  4 this.useMarshalling = useMarshalling;
 63    }
 64   
 65  4 public void run()
 66    {
 67  4 try
 68    {
 69  4 cache.start();
 70   
 71  0 if (useMarshalling)
 72    {
 73    // If we don't do initial state transfer, there is
 74    // no guarantee of start() blocking until the view is received
 75    // so we need to do it ourself
 76  0 TestingUtil.blockUntilViewReceived(cache, 2, 60000);
 77  0 cache.getRegion(Fqn.ROOT, true).activate();
 78    }
 79    }
 80    catch (Exception e)
 81    {
 82  4 failure = e;
 83    }
 84    }
 85    }
 86   
 87    /**
 88    * Generic superclass of classes that perform some operation on the
 89    * cache that is intended to hang with a lock held on certain nodes.
 90    */
 91    static abstract class TaskRunner extends Thread
 92    {
 93    CacheSPI cache;
 94    Fqn fqn;
 95    String value;
 96    Exception failure;
 97    boolean asleep = false;
 98   
 99  42 TaskRunner(CacheSPI cache, String rootFqn, String value)
 100    {
 101  42 this.cache = cache;
 102  42 this.value = value;
 103  42 this.fqn = new Fqn(Fqn.fromString(rootFqn), value);
 104    }
 105   
 106  39 public void run()
 107    {
 108  39 try
 109    {
 110    // do whatever the task is
 111  39 executeTask();
 112    }
 113    catch (Exception e)
 114    {
 115  1 if (!isDone())
 116  1 failure = e;
 117    }
 118    finally
 119    {
 120  10 asleep = false;
 121    // hook to allow final processing
 122  10 finalCleanup();
 123    }
 124    }
 125   
 126    abstract void executeTask() throws Exception;
 127   
 128    abstract boolean isDone();
 129   
 130  4 void finalCleanup()
 131    {
 132    }
 133   
 134  1284357 boolean isAsleep()
 135    {
 136  1284357 return asleep;
 137    }
 138    }
 139   
 140    /**
 141    * Hangs with an active or rollback-only transaction holding locks.
 142    */
 143    static class TxRunner extends TaskRunner
 144    {
 145    Transaction tx = null;
 146    boolean rollback = false;
 147    boolean done = true;
 148   
 149  18 TxRunner(CacheSPI cache, String rootFqn, String value, boolean rollback)
 150    {
 151  18 super(cache, rootFqn, value);
 152  18 this.rollback = rollback;
 153    }
 154   
 155  18 void executeTask() throws Exception
 156    {
 157  18 TransactionManager tm = cache.getTransactionManager();
 158  18 tm.begin();
 159  18 tx = tm.getTransaction();
 160   
 161  18 cache.put(fqn, "KEY", value);
 162   
 163  18 if (rollback)
 164  9 tx.setRollbackOnly();
 165   
 166  18 asleep = true;
 167  18 TestingUtil.sleepThread((long) 25000);
 168  6 done = true;
 169    }
 170   
 171  6 void finalCleanup()
 172    {
 173  6 if (tx != null)
 174    {
 175  6 try
 176    {
 177  6 tx.commit();
 178    }
 179    catch (Exception ignore)
 180    {
 181    }
 182    }
 183    }
 184   
 185  0 boolean isDone()
 186    {
 187  0 return done;
 188    }
 189    }
 190   
 191    /**
 192    * TreeCacheListener that hangs the thread in nodeModified().
 193    */
 194    @CacheListener
 195    static class HangThreadListener
 196    {
 197    boolean asleep;
 198    Fqn toHang;
 199    boolean alreadyHung;
 200    boolean done;
 201   
 202  3 HangThreadListener(Fqn toHang)
 203    {
 204  3 this.toHang = toHang;
 205    }
 206   
 207  0 @NodeModified
 208    public void nodeModified(NodeEvent e)
 209    {
 210  0 if (!e.isPre()) hangThread(e.getFqn());
 211    }
 212   
 213  0 private void hangThread(Fqn fqn)
 214    {
 215  0 if (!alreadyHung && toHang.equals(fqn))
 216    {
 217  0 asleep = true;
 218    //System.out.println("Hanging thread changing node " + fqn);
 219  0 alreadyHung = true;
 220  0 TestingUtil.sleepThread((long) 30000);
 221  0 done = true;
 222  0 asleep = false;
 223    }
 224    }
 225    }
 226   
 227    /**
 228    * Hangs with a non-transactional thread holding locks.
 229    */
 230    static class HangThreadRunner extends TaskRunner
 231    {
 232    HangThreadListener listener;
 233   
 234  3 HangThreadRunner(CacheSPI cache, String rootFqn, String value)
 235    {
 236  3 super(cache, rootFqn, value);
 237  3 listener = new HangThreadListener(fqn);
 238  3 cache.addCacheListener(listener);
 239    }
 240   
 241  0 void executeTask() throws Exception
 242    {
 243    // Just do a put and the listener will hang the thread
 244  0 cache.put(fqn, "KEY", value);
 245    }
 246   
 247  0 boolean isAsleep()
 248    {
 249  0 return listener.asleep;
 250    }
 251   
 252  0 boolean isDone()
 253    {
 254  0 return listener.done;
 255    }
 256    }
 257   
 258    /**
 259    * Synchronization that hangs the thread either in
 260    * beforeCompletion() or afterCompletion().
 261    */
 262    static class HangThreadSynchronization implements Synchronization
 263    {
 264    boolean asleep;
 265    boolean hangBefore;
 266    boolean done;
 267   
 268  21 HangThreadSynchronization(boolean hangBefore)
 269    {
 270  21 this.hangBefore = hangBefore;
 271    }
 272   
 273  20 public void beforeCompletion()
 274    {
 275  20 if (hangBefore)
 276    {
 277  9 hang();
 278    }
 279    }
 280   
 281  14 public void afterCompletion(int status)
 282    {
 283  14 if (!hangBefore)
 284    {
 285  11 hang();
 286    }
 287    }
 288   
 289  20 void hang()
 290    {
 291  20 asleep = true;
 292  20 TestingUtil.sleepThread((long) 30000);
 293  3 done = true;
 294    }
 295   
 296    }
 297   
 298    /**
 299    * Hangs with a transactional thread either in the beforeCompletion()
 300    * or afterCompletion() phase holding locks.
 301    */
 302    static class SynchronizationTxRunner extends TaskRunner
 303    {
 304    Transaction tx = null;
 305    HangThreadSynchronization sync;
 306   
 307  21 SynchronizationTxRunner(CacheSPI cache, String rootFqn, String value, boolean hangBefore)
 308    {
 309  21 super(cache, rootFqn, value);
 310  21 this.sync = new HangThreadSynchronization(hangBefore);
 311    }
 312   
 313  21 void executeTask() throws Exception
 314    {
 315  21 TransactionManager tm = cache.getTransactionManager();
 316  21 tm.begin();
 317  21 tx = tm.getTransaction();
 318  21 tx.registerSynchronization(sync);
 319   
 320  21 cache.put(fqn, "KEY", value);
 321   
 322    // Committing the tx will hang the thread
 323  20 tx.commit();
 324    }
 325   
 326  1142997 boolean isAsleep()
 327    {
 328  1142997 return sync.asleep;
 329    }
 330   
 331  1 boolean isDone()
 332    {
 333  1 return sync.done;
 334    }
 335    }
 336   
 337    /**
 338    * Tests the ability to force a state transfer in the presence
 339    * of active transactions on the sending cache.
 340    *
 341    * @throws Exception
 342    */
 343  1 public void testActiveTransaction() throws Exception
 344    {
 345  1 String[] values = {"A", "B", "C"};
 346  1 transactionTest(values, false, "REPEATABLE_READ");
 347    }
 348   
 349    /**
 350    * Tests the ability to force a state transfer in the presence
 351    * of a transaction marked rollback-only on the sending cache.
 352    *
 353    * @throws Exception
 354    */
 355  1 public void testRollbackOnlyTransaction() throws Exception
 356    {
 357  1 String[] values = {"A", "B", "C"};
 358  1 transactionTest(values, true, "REPEATABLE_READ");
 359    }
 360   
 361    /**
 362    * Run a basic test with transactional threads doing puts and then
 363    * hanging before committing.
 364    *
 365    * @param values node names under which puts should be done
 366    * @param rollback should the transactions be marked rollback-only
 367    * before hanging
 368    * @param isolationLevel cache's isolation level
 369    * @throws Exception
 370    */
 371  2 private void transactionTest(String[] values,
 372    boolean rollback,
 373    String isolationLevel) throws Exception
 374    {
 375    // Create the cache from which state will be requested
 376  2 CacheSPI sender = initializeSender(isolationLevel, false, false);
 377   
 378    // Start threads that will do operations on the cache and then hang
 379  2 TxRunner[] runners =
 380    initializeTransactionRunners(values, sender, "/LOCK", rollback);
 381   
 382    // Create and start the cache that requests a state transfer
 383  2 CacheSPI receiver = startReceiver(isolationLevel, false, false);
 384   
 385    // Confirm the receiver got the expected state and the threads are OK
 386  0 checkResults(receiver, runners, false);
 387    }
 388   
 389    /**
 390    * Creates and starts a CacheSPI from which another cache will request
 391    * state. Also adds value "X" under key "KEY" in node "/OK". This node
 392    * should be present in the transferred state in any test.
 393    *
 394    * @param isolationLevel cache's isolation level
 395    * @param replSync is cache REPL_SYNC?
 396    * @param useMarshalling is the activateRegion() API to be used?
 397    * @return the cache
 398    * @throws Exception
 399    */
 400  9 private CacheSPI initializeSender(String isolationLevel,
 401    boolean replSync,
 402    boolean useMarshalling) throws Exception
 403    {
 404  9 CacheSPI sender = createCache("sender", isolationLevel, replSync, useMarshalling, true);
 405   
 406  9 if (useMarshalling)
 407  1 sender.getRegion(Fqn.ROOT, true).activate();
 408   
 409  9 sender.put(Fqn.fromString("/OK"), "KEY", "X");
 410   
 411  9 return sender;
 412    }
 413   
 414    /**
 415    * Start a set of TaskRunner threads that do a transactional put on the cache
 416    * and then go to sleep with the transaction uncommitted.
 417    *
 418    * @param values the name of the node that should be put under
 419    * rootFqn, and the value that shoud be put in its map
 420    * @param sender the cache on which the put should be done
 421    * @param rootFqn Fqn under which the new node should be inserted -- the
 422    * Fqn of the new node will be /rootFqn/value
 423    * @param rollback <code>true</code> if the tx should be marked
 424    * rollback-only before the thread goes to sleep
 425    * @return the TaskRunner threads
 426    */
 427  6 private TxRunner[] initializeTransactionRunners(String[] values,
 428    CacheSPI sender,
 429    String rootFqn,
 430    boolean rollback)
 431    {
 432  6 TxRunner[] runners = new TxRunner[values.length];
 433  6 for (int i = 0; i < values.length; i++)
 434    {
 435  18 runners[i] = new TxRunner(sender, rootFqn, values[i], rollback);
 436  18 initializeRunner(runners[i]);
 437    }
 438   
 439  6 return runners;
 440    }
 441   
 442    /**
 443    * Starts the runner and waits up to 1 second until it is asleep, confirming
 444    * that it is alive.
 445    *
 446    * @param runner
 447    */
 448  39 private void initializeRunner(TaskRunner runner)
 449    {
 450  39 runner.start();
 451   
 452    // Loop until it executes its put and goes to sleep (i.e. hangs)
 453  39 long start = System.currentTimeMillis();
 454  39 while (!(runner.isAsleep()))
 455    {
 456  2427317 assertTrue(runner.getClass().getName() + " " + runner.value +
 457    " is alive", runner.isAlive());
 458    // Avoid hanging test fixture by only waiting 1 sec before failing
 459  2427317 assertFalse(runner.getClass().getName() + " " + runner.value +
 460    " has not timed out",
 461    (System.currentTimeMillis() - start) > 1000);
 462    }
 463    }
 464   
 465    /**
 466    * Checks whether the receiver cache has the expected state and whether
 467    * the runners ran cleanly. Also terminates the runners.
 468    *
 469    * @param receiver the cache that received state
 470    * @param runners the task runners
 471    * @param allowValues true if the runners' values are expected to
 472    * be in the cache state; false otherwise
 473    * @throws CacheException
 474    */
 475  0 private void checkResults(CacheSPI receiver,
 476    TaskRunner[] runners,
 477    boolean allowValues) throws CacheException
 478    {
 479    // Check that the runners are alive and kill them
 480  0 boolean[] aliveStates = new boolean[runners.length];
 481  0 for (int i = 0; i < runners.length; i++)
 482    {
 483  0 aliveStates[i] = runners[i].isAlive();
 484  0 if (aliveStates[i])
 485  0 runners[i].interrupt();
 486    }
 487   
 488    // Confirm we got the "non-hung" state
 489  0 assertEquals("OK value correct", "X", receiver.get(Fqn.fromString("/OK"), "KEY"));
 490   
 491  0 for (int i = 0; i < runners.length; i++)
 492    {
 493  0 assertTrue("Runner " + runners[i].value + " was alive", aliveStates[i]);
 494  0 assertNull("Runner " + runners[i].value + " ran cleanly", runners[i].failure);
 495  0 if (allowValues)
 496    {
 497  0 assertEquals("Correct value in " + runners[i].fqn,
 498    runners[i].value, receiver.get(runners[i].fqn, "KEY"));
 499    }
 500    else
 501    {
 502  0 assertNull("No value in " + runners[i].fqn,
 503    receiver.get(runners[i].fqn, "KEY"));
 504    }
 505    }
 506    }
 507   
 508    /**
 509    * Tests the ability to force a state transfer in the presence of
 510    * a hung thread holding a lock on the sending cache.
 511    *
 512    * @throws Exception
 513    */
 514  1 public void testHungThread() throws Exception
 515    {
 516    // Create the cache from which state will be requested
 517  1 CacheSPI sender = initializeSender("REPEATABLE_READ", false, false);
 518   
 519    // Start threads that will do operations on the cache and then hang
 520  1 String[] values = {"A", "B", "C"};
 521  1 HangThreadRunner[] runners = initializeHangThreadRunners(values, sender, "/LOCK");
 522   
 523    // Create and start the cache that requests a state transfer
 524  0 CacheSPI receiver = startReceiver("REPEATABLE_READ", false, false);
 525   
 526    // Confirm the receiver got the expected state and the threads are OK
 527  0 checkResults(receiver, runners, true);
 528    }
 529   
 530    /**
 531    * Start a set of TaskRunner threads that do a non-transactional put on the
 532    * cache and then go to sleep with the thread hung in a
 533    * TreeCacheListener and locks unreleased
 534    *
 535    * @param values the name of the node that should be put under
 536    * rootFqn, and the value that shoud be put in its map
 537    * @param sender the cache on which the put should be done
 538    * @param rootFqn Fqn under which the new node should be inserted -- the
 539    * Fqn of the new node will be /rootFqn/value
 540    * @return the TaskRunner threads
 541    */
 542  3 private HangThreadRunner[] initializeHangThreadRunners(String[] values,
 543    CacheSPI sender,
 544    String rootFqn)
 545    {
 546  3 HangThreadRunner[] runners = new HangThreadRunner[values.length];
 547  3 for (int i = 0; i < values.length; i++)
 548    {
 549  3 runners[i] = new HangThreadRunner(sender, rootFqn, values[i]);
 550  0 initializeRunner(runners[i]);
 551    }
 552   
 553  0 return runners;
 554    }
 555   
 556    /**
 557    * Tests the ability to force a state transfer in the presence
 558    * of a transaction that is hung in a
 559    * Synchronization.beforeCompletion() call.
 560    *
 561    * @throws Exception
 562    */
 563  1 public void testBeforeCompletionLock() throws Exception
 564    {
 565  1 synchronizationTest(true);
 566    }
 567   
 568    /**
 569    * Tests the ability to force a state transfer in the presence
 570    * of a transaction that is hung in a
 571    * Synchronization.beforeCompletion() call.
 572    *
 573    * @throws Exception
 574    */
 575  1 public void testAfterCompletionLock() throws Exception
 576    {
 577  1 synchronizationTest(false);
 578    }
 579   
 580    /**
 581    * Tests the ability to force a state transfer in the presence
 582    * of a transaction that is hung either in a
 583    * Synchronization.beforeCompletion() or Synchronization.afterCompletion()
 584    * call.
 585    *
 586    * @param hangBefore <code>true</code> if the thread should hang in
 587    * <code>beforeCompletion()</code>, <code>false</code>
 588    * if it should hang in <code>afterCompletion</code>
 589    * @throws Exception
 590    */
 591  2 private void synchronizationTest(boolean hangBefore) throws Exception
 592    {
 593  2 CacheSPI sender = initializeSender("REPEATABLE_READ", false, false);
 594   
 595  2 String[] values = {"A", "B", "C"};
 596  2 SynchronizationTxRunner[] runners =
 597    initializeSynchronizationTxRunners(values, sender, "/LOCK", hangBefore);
 598   
 599  2 CacheSPI receiver = startReceiver("REPEATABLE_READ", false, false);
 600   
 601  0 checkResults(receiver, runners, !hangBefore);
 602    }
 603   
 604   
 605    /**
 606    * Start a set of TaskRunner threads that do a transactional put on the
 607    * cache and then go to sleep with the thread hung in a
 608    * transaction Synchronization call and locks unreleased
 609    *
 610    * @param values the name of the node that should be put under
 611    * rootFqn, and the value that shoud be put in its map
 612    * @param sender the cache on which the put should be done
 613    * @param rootFqn Fqn under which the new node should be inserted -- the
 614    * Fqn of the new node will be /rootFqn/value
 615    * @param hangBefore <code>true</code> if the thread should hang in
 616    * <code>beforeCompletion()</code>, <code>false</code>
 617    * if it should hang in <code>afterCompletion</code>
 618    * @return the TaskRunner threads
 619    */
 620  8 private SynchronizationTxRunner[] initializeSynchronizationTxRunners(String[] values,
 621    CacheSPI sender,
 622    String rootFqn,
 623    boolean hangBefore)
 624    {
 625  8 SynchronizationTxRunner[] runners =
 626    new SynchronizationTxRunner[values.length];
 627  8 for (int i = 0; i < values.length; i++)
 628    {
 629  21 runners[i] = new SynchronizationTxRunner(sender, rootFqn, values[i], hangBefore);
 630  21 initializeRunner(runners[i]);
 631    }
 632  6 return runners;
 633    }
 634   
 635    /**
 636    * Tests the ability to force a state transfer in the presence
 637    * of multiple issues on the sending cache (active transactions,
 638    * rollback-only transactions, transactions hung in beforeCompletion() and
 639    * afterCompletion() calls, as well as hung threads).
 640    *
 641    * @throws Exception
 642    */
 643  1 public void testMultipleProblems() throws Exception
 644    {
 645  1 multipleProblemTest("REPEATABLE_READ", "/LOCK", false, false);
 646    }
 647   
 648    /**
 649    * Tests the ability to force a state transfer in the presence
 650    * of an active transaction in the sending cache
 651    * and isolation level SERIALIZABLE.
 652    *
 653    * @throws Exception
 654    */
 655  1 public void testSerializableIsolation() throws Exception
 656    {
 657  1 multipleProblemTest("SERIALIZABLE", "/", false, false);
 658    }
 659   
 660    /**
 661    * Tests the ability to force a partial state transfer with multiple
 662    * "problem" actors holding locks on the sending node. Same test as
 663    * {@link #testMultipleProblems()} except the partial state transfer API is
 664    * used instead of an initial state transfer.
 665    *
 666    * @throws Exception
 667    */
 668  1 public void testPartialStateTransfer() throws Exception
 669    {
 670  1 multipleProblemTest("REPEATABLE_READ", "/LOCK", false, true);
 671    }
 672   
 673    /**
 674    * Tests the ability to force a partial state transfer with multiple
 675    * "problem" actors holding locks on the sending node and cache mode
 676    * REPL_SYNC. Same test as {@link #testMultipleProblems()} except the
 677    * cache is configured for REPL_SYNC.
 678    *
 679    * @throws Exception
 680    */
 681  1 public void testReplSync() throws Exception
 682    {
 683  1 multipleProblemTest("REPEATABLE_READ", "/LOCK", true, false);
 684    }
 685   
 686    /**
 687    * Tests the ability to force a partial state transfer with multiple
 688    * "problem" actors holding locks on the sending node.
 689    *
 690    * @throws Exception
 691    */
 692  4 private void multipleProblemTest(String isolationLevel,
 693    String rootFqn,
 694    boolean replSync,
 695    boolean useMarshalling) throws Exception
 696    {
 697  4 CacheSPI sender = initializeSender(isolationLevel, replSync, useMarshalling);
 698   
 699    // Do the "after" nodes first, otherwise if there is a /LOCK parent
 700    // node, the rollback of a tx will remove it causing the test to fail
 701    // since the child node created by it will be gone as well.
 702    // This is really a REPEATABLE_READ bug that this test isn't intended
 703    // to catch; will create a separate locking test that shows it
 704  4 String[] val1 = {"A", "B", "C"};
 705  4 SynchronizationTxRunner[] after =
 706    initializeSynchronizationTxRunners(val1, sender, rootFqn, false);
 707   
 708  2 String[] val2 = {"D", "E", "F"};
 709  2 SynchronizationTxRunner[] before =
 710    initializeSynchronizationTxRunners(val2, sender, rootFqn, true);
 711   
 712  2 String[] val3 = {"G", "H", "I"};
 713  2 TxRunner[] active =
 714    initializeTransactionRunners(val3, sender, rootFqn, false);
 715   
 716  2 String[] val4 = {"J", "K", "L"};
 717  2 TxRunner[] rollback =
 718    initializeTransactionRunners(val4, sender, rootFqn, true);
 719   
 720  2 String[] val5 = {"M", "N", "O"};
 721  2 HangThreadRunner[] threads =
 722    initializeHangThreadRunners(val5, sender, rootFqn);
 723   
 724  0 CacheSPI receiver = startReceiver(isolationLevel, replSync, useMarshalling);
 725   
 726  0 checkResults(receiver, active, false);
 727  0 checkResults(receiver, rollback, false);
 728  0 checkResults(receiver, before, false);
 729  0 checkResults(receiver, after, true);
 730  0 checkResults(receiver, threads, true);
 731    }
 732   
 733  13 protected String getReplicationVersion()
 734    {
 735  13 return Version.version;
 736    }
 737   
 738    /**
 739    * Starts a cache that requests state from another cache. Confirms
 740    * that the receiver cache starts properly.
 741    *
 742    * @param isolationLevel
 743    * @param replSync
 744    * @param useMarshalling
 745    * @return the receiver cache
 746    * @throws Exception
 747    */
 748  4 private CacheSPI startReceiver(String isolationLevel,
 749    boolean replSync,
 750    boolean useMarshalling) throws Exception
 751    {
 752  4 CacheSPI receiver = createCache("receiver", isolationLevel, replSync, useMarshalling, false);
 753   
 754    // Start the cache in a separate thread so we can kill the
 755    // thread if the cache doesn't start properly
 756  4 CacheStarter starter = new CacheStarter(receiver, useMarshalling);
 757   
 758  4 starter.start();
 759   
 760  4 starter.join(20000);
 761   
 762  4 boolean alive = starter.isAlive();
 763  4 if (alive)
 764  0 starter.interrupt();
 765  4 assertFalse("Starter finished", alive);
 766   
 767  4 assertNull("No exceptions in starter", starter.failure);
 768   
 769  0 return receiver;
 770    }
 771   
 772    /**
 773    * Override the superclass version to set an unlimited state transfer timeout
 774    * and a 1 sec lock acquisition timeout.
 775    */
 776  13 private CacheSPI createCache(String cacheID,
 777    String isolationLevel,
 778    boolean replSync,
 779    boolean useMarshalling,
 780    boolean startCache)
 781    throws Exception
 782    {
 783  13 CacheSPI result = super.createCache(cacheID, replSync,
 784    useMarshalling, false, false, false);
 785  13 result.getConfiguration().setStateRetrievalTimeout(0);
 786  13 result.getConfiguration().setLockAcquisitionTimeout(1000);
 787  13 result.getConfiguration().setIsolationLevel(isolationLevel);
 788   
 789  13 if (startCache)
 790  9 result.start();
 791   
 792  13 return result;
 793    }
 794   
 795   
 796    }