1 |
| |
2 |
| |
3 |
| |
4 |
| |
5 |
| |
6 |
| |
7 |
| |
8 |
| package org.jboss.cache.replicated; |
9 |
| |
10 |
| import junit.framework.Test; |
11 |
| import junit.framework.TestCase; |
12 |
| import junit.framework.TestSuite; |
13 |
| import org.apache.commons.logging.Log; |
14 |
| import org.apache.commons.logging.LogFactory; |
15 |
| import org.jboss.cache.AbstractCacheListener; |
16 |
| import org.jboss.cache.Cache; |
17 |
| import org.jboss.cache.CacheException; |
18 |
| import org.jboss.cache.CacheImpl; |
19 |
| import org.jboss.cache.DefaultCacheFactory; |
20 |
| import org.jboss.cache.Fqn; |
21 |
| import org.jboss.cache.config.Configuration; |
22 |
| import org.jboss.cache.lock.IsolationLevel; |
23 |
| import org.jboss.cache.lock.TimeoutException; |
24 |
| import org.jboss.cache.misc.TestingUtil; |
25 |
| import org.jboss.cache.transaction.DummyTransactionManager; |
26 |
| |
27 |
| import javax.naming.Context; |
28 |
| import javax.transaction.NotSupportedException; |
29 |
| import javax.transaction.RollbackException; |
30 |
| import javax.transaction.Status; |
31 |
| import javax.transaction.Synchronization; |
32 |
| import javax.transaction.SystemException; |
33 |
| import javax.transaction.Transaction; |
34 |
| import javax.transaction.TransactionManager; |
35 |
| import java.util.ArrayList; |
36 |
| import java.util.List; |
37 |
| import java.util.Map; |
38 |
| import java.util.concurrent.Semaphore; |
39 |
| |
40 |
| |
41 |
| |
42 |
| |
43 |
| |
44 |
| |
45 |
| |
46 |
| |
47 |
| public class SyncReplTxTest extends TestCase |
48 |
| { |
49 |
| private static Log log = LogFactory.getLog(SyncReplTxTest.class); |
50 |
| private CacheImpl cache1; |
51 |
| private CacheImpl cache2; |
52 |
| |
53 |
| private String old_factory = null; |
54 |
| private final String FACTORY = "org.jboss.cache.transaction.DummyContextFactory"; |
55 |
| private Semaphore lock = new Semaphore(1); |
56 |
| private Throwable t1_ex; |
57 |
| private Throwable t2_ex; |
58 |
| |
59 |
| |
60 |
36
| public SyncReplTxTest(String name)
|
61 |
| { |
62 |
36
| super(name);
|
63 |
| } |
64 |
| |
65 |
36
| public void setUp() throws Exception
|
66 |
| { |
67 |
36
| super.setUp();
|
68 |
36
| old_factory = System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
|
69 |
36
| System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY);
|
70 |
36
| t1_ex = t2_ex = null;
|
71 |
| } |
72 |
| |
73 |
36
| public void tearDown() throws Exception
|
74 |
| { |
75 |
36
| super.tearDown();
|
76 |
36
| DummyTransactionManager.destroy();
|
77 |
36
| destroyCaches();
|
78 |
36
| if (old_factory != null)
|
79 |
| { |
80 |
34
| System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
|
81 |
34
| old_factory = null;
|
82 |
| } |
83 |
| } |
84 |
| |
85 |
52
| private Transaction beginTransaction() throws SystemException, NotSupportedException
|
86 |
| { |
87 |
52
| DummyTransactionManager mgr = DummyTransactionManager.getInstance();
|
88 |
52
| mgr.begin();
|
89 |
52
| return mgr.getTransaction();
|
90 |
| } |
91 |
| |
92 |
32
| private void initCaches(Configuration.CacheMode caching_mode) throws Exception
|
93 |
| { |
94 |
32
| cache1 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
|
95 |
32
| cache2 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
|
96 |
32
| cache1.getConfiguration().setCacheMode(caching_mode);
|
97 |
32
| cache2.getConfiguration().setCacheMode(caching_mode);
|
98 |
32
| cache1.getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE);
|
99 |
32
| cache2.getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE);
|
100 |
| |
101 |
32
| cache1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
|
102 |
32
| cache2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
|
103 |
32
| cache1.getConfiguration().setLockAcquisitionTimeout(5000);
|
104 |
32
| cache2.getConfiguration().setLockAcquisitionTimeout(5000);
|
105 |
| |
106 |
32
| configureMultiplexer(cache1);
|
107 |
32
| configureMultiplexer(cache2);
|
108 |
| |
109 |
32
| cache1.start();
|
110 |
32
| cache2.start();
|
111 |
| |
112 |
32
| validateMultiplexer(cache1);
|
113 |
32
| validateMultiplexer(cache2);
|
114 |
| } |
115 |
| |
116 |
| |
117 |
| |
118 |
| |
119 |
| |
120 |
| |
121 |
| |
122 |
| |
123 |
64
| protected void configureMultiplexer(Cache cache) throws Exception
|
124 |
| { |
125 |
| |
126 |
| } |
127 |
| |
128 |
| |
129 |
| |
130 |
| |
131 |
| |
132 |
| |
133 |
| |
134 |
| |
135 |
64
| protected void validateMultiplexer(Cache cache)
|
136 |
| { |
137 |
64
| assertFalse("Cache is not using multiplexer", cache.getConfiguration().isUsingMultiplexer());
|
138 |
| } |
139 |
| |
140 |
36
| private void destroyCaches()
|
141 |
| { |
142 |
36
| if (cache1 != null)
|
143 |
| { |
144 |
32
| cache1.stop();
|
145 |
| } |
146 |
36
| if (cache2 != null)
|
147 |
| { |
148 |
32
| cache2.stop();
|
149 |
| } |
150 |
36
| cache1 = null;
|
151 |
36
| cache2 = null;
|
152 |
| } |
153 |
| |
154 |
2
| public void testLockRemoval() throws Exception
|
155 |
| { |
156 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
157 |
2
| cache1.getConfiguration().setSyncCommitPhase(true);
|
158 |
2
| cache1.releaseAllLocks("/");
|
159 |
2
| Transaction tx = beginTransaction();
|
160 |
2
| cache1.put("/bela/ban", "name", "Bela Ban");
|
161 |
2
| assertEquals(3, cache1.getNumberOfLocksHeld());
|
162 |
2
| assertEquals(0, cache2.getNumberOfLocksHeld());
|
163 |
2
| tx.commit();
|
164 |
2
| assertEquals(0, cache1.getNumberOfLocksHeld());
|
165 |
2
| assertEquals(0, cache2.getNumberOfLocksHeld());
|
166 |
| } |
167 |
| |
168 |
| |
169 |
2
| public void testSyncRepl() throws Exception
|
170 |
| { |
171 |
2
| Integer age;
|
172 |
2
| Transaction tx;
|
173 |
| |
174 |
2
| try
|
175 |
| { |
176 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
177 |
2
| cache1.getConfiguration().setSyncCommitPhase(true);
|
178 |
2
| cache2.getConfiguration().setSyncCommitPhase(true);
|
179 |
| |
180 |
| |
181 |
| |
182 |
2
| tx = beginTransaction();
|
183 |
2
| cache1.put("/a/b/c", "age", 38);
|
184 |
2
| TransactionManager mgr = cache1.getTransactionManager();
|
185 |
2
| tx = mgr.suspend();
|
186 |
2
| assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
|
187 |
2
| log.debug("cache1: locks held before commit: " + cache1.printLockInfo());
|
188 |
2
| log.debug("cache2: locks held before commit: " + cache2.printLockInfo());
|
189 |
2
| mgr.resume(tx);
|
190 |
2
| tx.commit();
|
191 |
2
| log.debug("cache1: locks held after commit: " + cache1.printLockInfo());
|
192 |
2
| log.debug("cache2: locks held after commit: " + cache2.printLockInfo());
|
193 |
| |
194 |
| |
195 |
2
| age = (Integer) cache2.get("/a/b/c", "age");
|
196 |
2
| assertNotNull("\"age\" obtained from cache2 must be non-null ", age);
|
197 |
2
| assertTrue("\"age\" must be 38", age == 38);
|
198 |
| } |
199 |
| catch (Exception e) |
200 |
| { |
201 |
0
| fail(e.toString());
|
202 |
| } |
203 |
| } |
204 |
| |
205 |
| |
206 |
| |
207 |
| |
208 |
2
| public void testSimplePut() throws Exception
|
209 |
| { |
210 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
211 |
| |
212 |
2
| cache1.put("/JSESSION/localhost/192.168.1.10:32882/Courses/0", "Instructor", "Ben Wang");
|
213 |
| |
214 |
2
| cache1.put("/JSESSION/localhost/192.168.1.10:32882/1", "Number", 10);
|
215 |
| } |
216 |
| |
217 |
| |
218 |
2
| public void testSimpleTxPut() throws Exception
|
219 |
| { |
220 |
2
| Transaction tx;
|
221 |
2
| final Fqn NODE1 = Fqn.fromString("/one/two/three");
|
222 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
223 |
| |
224 |
2
| tx = beginTransaction();
|
225 |
2
| cache1.put(NODE1, "age", 38);
|
226 |
2
| System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
|
227 |
2
| tx.commit();
|
228 |
| |
229 |
| |
230 |
| |
231 |
| |
232 |
| |
233 |
| |
234 |
| |
235 |
| |
236 |
| |
237 |
| |
238 |
| |
239 |
| |
240 |
| |
241 |
| |
242 |
| |
243 |
| |
244 |
| |
245 |
| |
246 |
| |
247 |
| |
248 |
| } |
249 |
| |
250 |
2
| public void testSyncReplWithModficationsOnBothCaches() throws Exception
|
251 |
| { |
252 |
2
| Integer age;
|
253 |
2
| Transaction tx;
|
254 |
2
| final Fqn NODE1 = Fqn.fromString("/one/two/three");
|
255 |
2
| final Fqn NODE2 = Fqn.fromString("/eins/zwei/drei");
|
256 |
| |
257 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
258 |
| |
259 |
| |
260 |
2
| cache1.put("/one/two", null);
|
261 |
2
| cache2.put("/eins/zwei", null);
|
262 |
| |
263 |
2
| cache1.getConfiguration().setSyncCommitPhase(true);
|
264 |
2
| cache2.getConfiguration().setSyncCommitPhase(true);
|
265 |
| |
266 |
2
| tx = beginTransaction();
|
267 |
2
| cache1.put(NODE1, "age", 38);
|
268 |
2
| System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
|
269 |
| |
270 |
2
| cache2.put(NODE2, "age", 39);
|
271 |
2
| System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true));
|
272 |
| |
273 |
2
| System.out.println("cache1 before commit:\n" + cache1.printLockInfo());
|
274 |
2
| System.out.println("cache2 before commit:\n" + cache2.printLockInfo());
|
275 |
| |
276 |
2
| try
|
277 |
| { |
278 |
2
| tx.commit();
|
279 |
0
| fail("Should not succeed with SERIALIZABLE semantics");
|
280 |
| } |
281 |
| catch (Exception e) |
282 |
| { |
283 |
| |
284 |
| } |
285 |
| |
286 |
2
| System.out.println("cache1 after commit:\n" + cache1.printLockInfo());
|
287 |
2
| System.out.println("cache2 after commit:\n" + cache2.printLockInfo());
|
288 |
| |
289 |
| |
290 |
| |
291 |
| |
292 |
| |
293 |
| |
294 |
| |
295 |
| |
296 |
| |
297 |
| |
298 |
| |
299 |
| |
300 |
| |
301 |
| |
302 |
| |
303 |
| |
304 |
| |
305 |
| |
306 |
| |
307 |
| |
308 |
| |
309 |
| |
310 |
| |
311 |
| |
312 |
2
| assertEquals(0, cache1.getNumberOfLocksHeld());
|
313 |
2
| assertEquals(0, cache2.getNumberOfLocksHeld());
|
314 |
2
| System.out.println("TransactionTable for cache1:\n" + cache1.getTransactionTable().toString(true));
|
315 |
2
| System.out.println("TransactionTable for cache2:\n" + cache2.getTransactionTable().toString(true));
|
316 |
| } |
317 |
| |
318 |
2
| public void testSyncReplWithModficationsOnBothCachesSameData() throws Exception
|
319 |
| { |
320 |
2
| Transaction tx;
|
321 |
2
| final Fqn NODE = Fqn.fromString("/one/two/three");
|
322 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
323 |
2
| tx = beginTransaction();
|
324 |
2
| cache1.put(NODE, "age", 38);
|
325 |
2
| System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
|
326 |
| |
327 |
2
| cache2.put(NODE, "age", 39);
|
328 |
2
| System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true));
|
329 |
| |
330 |
2
| System.out.println("cache1 before commit:\n" + cache1.printLockInfo());
|
331 |
2
| System.out.println("cache2 before commit:\n" + cache2.printLockInfo());
|
332 |
| |
333 |
2
| try
|
334 |
| { |
335 |
2
| tx.commit();
|
336 |
0
| fail("commit should throw a RollbackException, we should not get here");
|
337 |
| } |
338 |
| catch (RollbackException rollback) |
339 |
| { |
340 |
2
| System.out.println("Transaction was rolled back, this is correct");
|
341 |
| } |
342 |
| |
343 |
2
| System.out.println("cache1 after commit:\n" + cache1.printLockInfo());
|
344 |
2
| System.out.println("cache2 after commit:\n" + cache2.printLockInfo());
|
345 |
| |
346 |
2
| assertEquals(0, cache1.getNumberOfLocksHeld());
|
347 |
2
| assertEquals(0, cache2.getNumberOfLocksHeld());
|
348 |
| |
349 |
2
| assertEquals(0, cache1.getNumberOfNodes());
|
350 |
2
| assertEquals(0, cache2.getNumberOfNodes());
|
351 |
| } |
352 |
| |
353 |
| |
354 |
2
| public void testSyncReplWithModficationsOnBothCachesWithRollback() throws Exception
|
355 |
| { |
356 |
2
| Transaction tx;
|
357 |
2
| final Fqn NODE1 = Fqn.fromString("/one/two/three");
|
358 |
2
| final Fqn NODE2 = Fqn.fromString("/eins/zwei/drei");
|
359 |
| |
360 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
361 |
| |
362 |
2
| cache1.getConfiguration().setSyncRollbackPhase(true);
|
363 |
2
| cache2.getConfiguration().setSyncRollbackPhase(true);
|
364 |
| |
365 |
2
| tx = beginTransaction();
|
366 |
2
| cache1.put(NODE1, "age", 38);
|
367 |
2
| cache2.put(NODE2, "age", 39);
|
368 |
| |
369 |
2
| System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
|
370 |
2
| System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
|
371 |
| |
372 |
| |
373 |
2
| tx.registerSynchronization(new TransactionAborter(tx));
|
374 |
| |
375 |
2
| try
|
376 |
| { |
377 |
2
| tx.commit();
|
378 |
0
| fail("commit should throw a RollbackException, we should not get here");
|
379 |
| } |
380 |
| catch (RollbackException rollback) |
381 |
| { |
382 |
2
| System.out.println("Transaction was rolled back, this is correct");
|
383 |
| } |
384 |
| |
385 |
2
| System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
|
386 |
2
| System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
|
387 |
| |
388 |
2
| assertEquals(0, cache1.getNumberOfLocksHeld());
|
389 |
2
| assertEquals(0, cache2.getNumberOfLocksHeld());
|
390 |
| |
391 |
2
| assertEquals(0, cache1.getNumberOfNodes());
|
392 |
2
| assertEquals(0, cache2.getNumberOfNodes());
|
393 |
| } |
394 |
| |
395 |
| |
396 |
| |
397 |
| |
398 |
| |
399 |
| |
400 |
| |
401 |
| |
402 |
2
| public void testSyncReplWithRollbackAndListener() throws Exception
|
403 |
| { |
404 |
2
| Transaction tx;
|
405 |
2
| final Fqn NODE1 = Fqn.fromString("/one/two/three");
|
406 |
| |
407 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
408 |
| |
409 |
2
| cache1.getConfiguration().setSyncRollbackPhase(true);
|
410 |
2
| cache2.getConfiguration().setSyncRollbackPhase(true);
|
411 |
| |
412 |
| |
413 |
| |
414 |
2
| CallbackListener cbl1 = new CallbackListener(cache1, "age");
|
415 |
2
| CallbackListener cbl2 = new CallbackListener(cache2, "age");
|
416 |
| |
417 |
2
| tx = beginTransaction();
|
418 |
2
| cache1.put(NODE1, "age", 38);
|
419 |
| |
420 |
2
| System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
|
421 |
2
| System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
|
422 |
| |
423 |
| |
424 |
2
| tx.registerSynchronization(new TransactionAborter(tx));
|
425 |
| |
426 |
2
| try
|
427 |
| { |
428 |
2
| tx.commit();
|
429 |
0
| fail("commit should throw a RollbackException, we should not get here");
|
430 |
| } |
431 |
| catch (RollbackException rollback) |
432 |
| { |
433 |
2
| rollback.printStackTrace();
|
434 |
2
| System.out.println("Transaction was rolled back, this is correct");
|
435 |
| } |
436 |
| |
437 |
| |
438 |
2
| TestingUtil.sleepThread(1000);
|
439 |
| |
440 |
2
| System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
|
441 |
2
| System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
|
442 |
| |
443 |
2
| assertNull(cbl1.getCallbackException());
|
444 |
2
| assertNull(cbl2.getCallbackException());
|
445 |
| |
446 |
2
| assertEquals(0, cache1.getNumberOfLocksHeld());
|
447 |
2
| assertEquals(0, cache2.getNumberOfLocksHeld());
|
448 |
| |
449 |
2
| assertEquals(0, cache1.getNumberOfNodes());
|
450 |
2
| assertEquals(0, cache2.getNumberOfNodes());
|
451 |
| |
452 |
| |
453 |
| |
454 |
2
| cache2.getNotifier().removeCacheListener(cbl2);
|
455 |
| |
456 |
2
| cbl2 = new TransactionAborterCallbackListener(cache2, "age");
|
457 |
| |
458 |
2
| tx = beginTransaction();
|
459 |
2
| cache1.put(NODE1, "age", 38);
|
460 |
| |
461 |
2
| System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
|
462 |
2
| System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
|
463 |
| |
464 |
2
| tx.commit();
|
465 |
| |
466 |
| |
467 |
2
| TestingUtil.sleepThread(1000);
|
468 |
| |
469 |
2
| System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
|
470 |
2
| System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
|
471 |
| |
472 |
2
| assertNull(cbl1.getCallbackException());
|
473 |
2
| assertNull(cbl2.getCallbackException());
|
474 |
| |
475 |
2
| assertEquals(0, cache1.getNumberOfLocksHeld());
|
476 |
2
| assertEquals(0, cache2.getNumberOfLocksHeld());
|
477 |
| |
478 |
| |
479 |
2
| assertEquals(3, cache1.getNumberOfNodes());
|
480 |
2
| assertEquals(0, cache2.getNumberOfNodes());
|
481 |
| |
482 |
| } |
483 |
| |
484 |
| |
485 |
| |
486 |
| |
487 |
| |
488 |
| |
489 |
| |
490 |
| |
491 |
2
| public void testSyncReplWithRemoteRollback() throws Exception
|
492 |
| { |
493 |
2
| Transaction tx;
|
494 |
2
| final Fqn NODE1 = Fqn.fromString("/one/two/three");
|
495 |
| |
496 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
497 |
| |
498 |
2
| cache1.getConfiguration().setSyncRollbackPhase(true);
|
499 |
2
| cache2.getConfiguration().setSyncRollbackPhase(true);
|
500 |
| |
501 |
| |
502 |
| |
503 |
| |
504 |
2
| TransactionAborterListener tal = new TransactionAborterListener(cache2);
|
505 |
| |
506 |
2
| tx = beginTransaction();
|
507 |
2
| cache1.put(NODE1, "age", 38);
|
508 |
| |
509 |
2
| System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
|
510 |
2
| System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
|
511 |
| |
512 |
2
| try
|
513 |
| { |
514 |
2
| tx.commit();
|
515 |
0
| fail("commit should throw a RollbackException, we should not get here");
|
516 |
| } |
517 |
| catch (RollbackException rollback) |
518 |
| { |
519 |
2
| System.out.println("Transaction was rolled back, this is correct");
|
520 |
| } |
521 |
| |
522 |
| |
523 |
2
| TestingUtil.sleepThread(1000);
|
524 |
| |
525 |
2
| System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
|
526 |
2
| System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
|
527 |
| |
528 |
2
| assertNull(tal.getCallbackException());
|
529 |
| |
530 |
2
| assertEquals(0, cache1.getNumberOfLocksHeld());
|
531 |
2
| assertEquals(0, cache2.getNumberOfLocksHeld());
|
532 |
| |
533 |
2
| assertEquals(0, cache1.getNumberOfNodes());
|
534 |
2
| assertEquals(0, cache2.getNumberOfNodes());
|
535 |
| |
536 |
| } |
537 |
| |
538 |
| |
539 |
2
| public void testASyncRepl() throws Exception
|
540 |
| { |
541 |
2
| Integer age;
|
542 |
2
| Transaction tx;
|
543 |
| |
544 |
2
| initCaches(Configuration.CacheMode.REPL_ASYNC);
|
545 |
| |
546 |
2
| tx = beginTransaction();
|
547 |
2
| cache1.put("/a/b/c", "age", 38);
|
548 |
2
| Thread.sleep(1000);
|
549 |
2
| assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
|
550 |
2
| tx.commit();
|
551 |
2
| Thread.sleep(1000);
|
552 |
| |
553 |
| |
554 |
2
| age = (Integer) cache2.get("/a/b/c", "age");
|
555 |
2
| assertNotNull("\"age\" obtained from cache2 is null ", age);
|
556 |
2
| assertTrue("\"age\" must be 38", age == 38);
|
557 |
| |
558 |
| } |
559 |
| |
560 |
| |
561 |
| |
562 |
| |
563 |
| |
564 |
| |
565 |
| |
566 |
| |
567 |
| |
568 |
| |
569 |
| |
570 |
| |
571 |
| |
572 |
| |
573 |
| |
574 |
| |
575 |
| |
576 |
| |
577 |
| |
578 |
| |
579 |
| |
580 |
| |
581 |
| |
582 |
| |
583 |
| |
584 |
| |
585 |
| |
586 |
| |
587 |
| |
588 |
| |
589 |
| |
590 |
| |
591 |
| |
592 |
| |
593 |
| |
594 |
| |
595 |
| |
596 |
2
| public void testConcurrentPuts() throws Exception
|
597 |
| { |
598 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
599 |
2
| cache1.getConfiguration().setSyncCommitPhase(true);
|
600 |
| |
601 |
2
| Thread t1 = new Thread("Thread1")
|
602 |
| { |
603 |
| Transaction tx; |
604 |
| |
605 |
2
| public void run()
|
606 |
| { |
607 |
2
| try
|
608 |
| { |
609 |
2
| tx = beginTransaction();
|
610 |
2
| cache1.put("/bela/ban", "name", "Bela Ban");
|
611 |
2
| TestingUtil.sleepThread(2000);
|
612 |
2
| tx.commit();
|
613 |
2
| System.out.println("[Thread1] ** LOCK INFO cache1: " + cache1.printLockInfo());
|
614 |
2
| System.out.println("[Thread1] ** LOCK INFO cache2: " + cache2.printLockInfo());
|
615 |
| } |
616 |
| catch (Throwable ex) |
617 |
| { |
618 |
0
| ex.printStackTrace();
|
619 |
0
| t1_ex = ex;
|
620 |
| } |
621 |
| } |
622 |
| }; |
623 |
| |
624 |
2
| Thread t2 = new Thread("Thread2")
|
625 |
| { |
626 |
| Transaction tx; |
627 |
| |
628 |
2
| public void run()
|
629 |
| { |
630 |
2
| try
|
631 |
| { |
632 |
2
| TestingUtil.sleepThread(1000);
|
633 |
2
| tx = beginTransaction();
|
634 |
2
| System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
|
635 |
2
| System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
|
636 |
2
| cache1.put("/bela/ban", "name", "Michelle Ban");
|
637 |
2
| System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
|
638 |
2
| System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
|
639 |
2
| tx.commit();
|
640 |
2
| System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
|
641 |
2
| System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
|
642 |
| } |
643 |
| catch (Throwable ex) |
644 |
| { |
645 |
0
| ex.printStackTrace();
|
646 |
0
| t2_ex = ex;
|
647 |
| } |
648 |
| } |
649 |
| }; |
650 |
| |
651 |
| |
652 |
2
| t1.start();
|
653 |
2
| t2.start();
|
654 |
| |
655 |
| |
656 |
2
| t1.join();
|
657 |
2
| t2.join();
|
658 |
| |
659 |
2
| if (t1_ex != null)
|
660 |
| { |
661 |
0
| fail("Thread1 failed: " + t1_ex);
|
662 |
| } |
663 |
2
| if (t2_ex != null)
|
664 |
| { |
665 |
0
| fail("Thread2 failed: " + t2_ex);
|
666 |
| } |
667 |
| |
668 |
2
| assertEquals("Michelle Ban", cache1.get("/bela/ban", "name"));
|
669 |
| } |
670 |
| |
671 |
| |
672 |
| |
673 |
| |
674 |
| |
675 |
2
| public void testConcurrentCommitsWith1Thread() throws Exception
|
676 |
| { |
677 |
2
| _testConcurrentCommits(1);
|
678 |
| } |
679 |
| |
680 |
| |
681 |
| |
682 |
| |
683 |
2
| public void testConcurrentCommitsWith5Threads() throws Exception
|
684 |
| { |
685 |
2
| _testConcurrentCommits(5);
|
686 |
| } |
687 |
| |
688 |
| |
689 |
| |
690 |
| |
691 |
4
| private void _testConcurrentCommits(int num_threads) throws Exception
|
692 |
| { |
693 |
4
| Object myMutex = new Object();
|
694 |
| |
695 |
4
| final CacheImpl c1 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
|
696 |
4
| final CacheImpl c2 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
|
697 |
4
| c1.getConfiguration().setClusterName("TempCluster");
|
698 |
4
| c2.getConfiguration().setClusterName("TempCluster");
|
699 |
4
| c1.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
|
700 |
4
| c2.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
|
701 |
4
| c1.getConfiguration().setSyncCommitPhase(true);
|
702 |
4
| c2.getConfiguration().setSyncCommitPhase(true);
|
703 |
4
| c1.getConfiguration().setSyncRollbackPhase(true);
|
704 |
4
| c2.getConfiguration().setSyncRollbackPhase(true);
|
705 |
4
| c1.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ);
|
706 |
4
| c2.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ);
|
707 |
4
| c1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
|
708 |
4
| c2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
|
709 |
4
| c1.getConfiguration().setLockAcquisitionTimeout(5000);
|
710 |
4
| c2.getConfiguration().setLockAcquisitionTimeout(5000);
|
711 |
4
| c1.start();
|
712 |
4
| c2.start();
|
713 |
4
| final List<Exception> exceptions = new ArrayList<Exception>();
|
714 |
| |
715 |
| class MyThread extends Thread |
716 |
| { |
717 |
| Object mutex; |
718 |
| |
719 |
12
| public MyThread(String name, Object mutex)
|
720 |
| { |
721 |
12
| super(name);
|
722 |
12
| this.mutex = mutex;
|
723 |
| } |
724 |
| |
725 |
12
| public void run()
|
726 |
| { |
727 |
12
| Transaction tx = null;
|
728 |
| |
729 |
12
| try
|
730 |
| { |
731 |
12
| tx = beginTransaction();
|
732 |
12
| c1.put("/thread/" + getName(), null);
|
733 |
12
| System.out.println("Thread " + getName() + " after put(): " + c1.toString());
|
734 |
12
| System.out.println("Thread " + getName() + " waiting on mutex");
|
735 |
12
| synchronized (mutex)
|
736 |
| { |
737 |
12
| mutex.wait();
|
738 |
| } |
739 |
12
| System.out.println("Thread " + getName() + " committing");
|
740 |
12
| tx.commit();
|
741 |
12
| System.out.println("Thread " + getName() + " committed successfully");
|
742 |
| } |
743 |
| catch (Exception e) |
744 |
| { |
745 |
0
| exceptions.add(e);
|
746 |
| } |
747 |
| finally |
748 |
| { |
749 |
12
| try
|
750 |
| { |
751 |
12
| if (tx != null) tx.rollback();
|
752 |
| } |
753 |
| catch (Exception e) |
754 |
| { |
755 |
| } |
756 |
| } |
757 |
| } |
758 |
| } |
759 |
| |
760 |
4
| MyThread[] threads = new MyThread[num_threads];
|
761 |
4
| for (int i = 0; i < threads.length; i++)
|
762 |
| { |
763 |
12
| threads[i] = new MyThread("#" + i, myMutex);
|
764 |
| } |
765 |
4
| for (int i = 0; i < threads.length; i++)
|
766 |
| { |
767 |
12
| MyThread thread = threads[i];
|
768 |
12
| System.out.println("starting thread #" + i);
|
769 |
12
| thread.start();
|
770 |
| } |
771 |
| |
772 |
4
| TestingUtil.sleepThread(6000);
|
773 |
4
| synchronized (myMutex)
|
774 |
| { |
775 |
4
| System.out.println("cache is " + c1.printLockInfo());
|
776 |
4
| System.out.println("******************* SIGNALLING THREADS ********************");
|
777 |
4
| myMutex.notifyAll();
|
778 |
| } |
779 |
| |
780 |
4
| for (MyThread thread : threads)
|
781 |
| { |
782 |
12
| try
|
783 |
| { |
784 |
12
| thread.join();
|
785 |
12
| System.out.println("Joined thread " + thread.getName());
|
786 |
| } |
787 |
| catch (InterruptedException e) |
788 |
| { |
789 |
0
| e.printStackTrace();
|
790 |
| } |
791 |
| } |
792 |
| |
793 |
4
| System.out.println("FINAL c1:\n" + c1.printDetails() + "\nlocks:\n" + c1.printLockInfo());
|
794 |
| |
795 |
4
| assertEquals(0, c1.getNumberOfLocksHeld());
|
796 |
4
| assertEquals(0, c2.getNumberOfLocksHeld());
|
797 |
| |
798 |
4
| c1.stop();
|
799 |
4
| c2.stop();
|
800 |
| |
801 |
| |
802 |
| |
803 |
| |
804 |
| |
805 |
| |
806 |
| |
807 |
| |
808 |
| |
809 |
| |
810 |
| |
811 |
0
| for (Exception exception : exceptions) assertEquals(TimeoutException.class, exception.getClass());
|
812 |
| } |
813 |
| |
814 |
| |
815 |
| |
816 |
| |
817 |
| |
818 |
2
| public void testConcurrentPutsOnTwoInstances() throws Exception
|
819 |
| { |
820 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
821 |
2
| final CacheImpl c1 = this.cache1;
|
822 |
2
| final CacheImpl c2 = this.cache2;
|
823 |
| |
824 |
2
| Thread t1 = new Thread()
|
825 |
| { |
826 |
| Transaction tx; |
827 |
| |
828 |
2
| public void run()
|
829 |
| { |
830 |
2
| try
|
831 |
| { |
832 |
2
| tx = beginTransaction();
|
833 |
2
| c1.put("/ben/wang", "name", "Ben Wang");
|
834 |
2
| TestingUtil.sleepThread(8000);
|
835 |
2
| tx.commit();
|
836 |
| } |
837 |
| catch (Throwable ex) |
838 |
| { |
839 |
0
| ex.printStackTrace();
|
840 |
0
| t1_ex = ex;
|
841 |
| } |
842 |
| } |
843 |
| }; |
844 |
| |
845 |
2
| Thread t2 = new Thread()
|
846 |
| { |
847 |
| Transaction tx; |
848 |
| |
849 |
2
| public void run()
|
850 |
| { |
851 |
2
| try
|
852 |
| { |
853 |
2
| TestingUtil.sleepThread(1000);
|
854 |
2
| tx = beginTransaction();
|
855 |
2
| c2.put("/ben/wang", "name", "Ben Jr.");
|
856 |
2
| tx.commit();
|
857 |
| } |
858 |
| catch (RollbackException rollback_ex) |
859 |
| { |
860 |
2
| System.out.println("received rollback exception as expected");
|
861 |
| } |
862 |
| catch (Throwable ex) |
863 |
| { |
864 |
0
| ex.printStackTrace();
|
865 |
0
| t2_ex = ex;
|
866 |
| } |
867 |
| } |
868 |
| }; |
869 |
| |
870 |
| |
871 |
2
| t1.start();
|
872 |
2
| t2.start();
|
873 |
| |
874 |
| |
875 |
2
| t1.join();
|
876 |
2
| t2.join();
|
877 |
| |
878 |
2
| if (t1_ex != null)
|
879 |
| { |
880 |
0
| fail("Thread1 failed: " + t1_ex);
|
881 |
| } |
882 |
2
| if (t2_ex != null)
|
883 |
| { |
884 |
0
| fail("Thread2 failed: " + t2_ex);
|
885 |
| } |
886 |
2
| assertEquals("Ben Wang", c1.get("/ben/wang", "name"));
|
887 |
| } |
888 |
| |
889 |
| |
890 |
2
| public void testPut() throws Exception
|
891 |
| { |
892 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
893 |
2
| final CacheImpl c1 = this.cache1;
|
894 |
| |
895 |
| |
896 |
2
| Thread t1 = new Thread()
|
897 |
| { |
898 |
2
| public void run()
|
899 |
| { |
900 |
2
| try
|
901 |
| { |
902 |
2
| lock.acquire();
|
903 |
2
| System.out.println("-- t1 has lock");
|
904 |
2
| c1.put("/a/b/c", "age", 38);
|
905 |
2
| System.out.println("[Thread1] set value to 38");
|
906 |
| |
907 |
2
| System.out.println("-- t1 releases lock");
|
908 |
2
| lock.release();
|
909 |
2
| TestingUtil.sleepThread(300);
|
910 |
2
| Thread.yield();
|
911 |
| |
912 |
2
| lock.acquire();
|
913 |
2
| System.out.println("-- t1 has lock");
|
914 |
2
| c1.put("/a/b/c", "age", 39);
|
915 |
2
| System.out.println("[Thread1] set value to 39");
|
916 |
| |
917 |
2
| System.out.println("-- t1 releases lock");
|
918 |
2
| lock.release();
|
919 |
2
| assertEquals(39, c1.get("/a/b/c", "age"));
|
920 |
| } |
921 |
| catch (Throwable ex) |
922 |
| { |
923 |
0
| ex.printStackTrace();
|
924 |
0
| t1_ex = ex;
|
925 |
| } |
926 |
| finally |
927 |
| { |
928 |
2
| lock.release();
|
929 |
| } |
930 |
| } |
931 |
| }; |
932 |
| |
933 |
2
| Thread t2 = new Thread()
|
934 |
| { |
935 |
2
| public void run()
|
936 |
| { |
937 |
2
| try
|
938 |
| { |
939 |
2
| TestingUtil.sleepThread(100);
|
940 |
2
| Thread.yield();
|
941 |
2
| lock.acquire();
|
942 |
2
| System.out.println("-- t2 has lock");
|
943 |
| |
944 |
2
| Integer val = (Integer) cache2.get("/a/b/c", "age");
|
945 |
2
| System.out.println("[Thread2] value is " + val);
|
946 |
2
| assertEquals(new Integer(38), val);
|
947 |
2
| System.out.println("-- t2 releases lock");
|
948 |
2
| lock.release();
|
949 |
2
| TestingUtil.sleepThread(300);
|
950 |
2
| Thread.yield();
|
951 |
2
| TestingUtil.sleepThread(500);
|
952 |
2
| lock.acquire();
|
953 |
2
| System.out.println("-- t2 has lock");
|
954 |
2
| val = (Integer) cache2.get("/a/b/c", "age");
|
955 |
2
| System.out.println("-- t2 releases lock");
|
956 |
2
| lock.release();
|
957 |
2
| assertEquals(new Integer(39), val);
|
958 |
| } |
959 |
| catch (Throwable ex) |
960 |
| { |
961 |
0
| ex.printStackTrace();
|
962 |
0
| t2_ex = ex;
|
963 |
| } |
964 |
| finally |
965 |
| { |
966 |
2
| lock.release();
|
967 |
| } |
968 |
| } |
969 |
| }; |
970 |
| |
971 |
| |
972 |
2
| t1.start();
|
973 |
2
| t2.start();
|
974 |
| |
975 |
| |
976 |
2
| t1.join();
|
977 |
2
| t2.join();
|
978 |
2
| if (t1_ex != null)
|
979 |
| { |
980 |
0
| fail("Thread1 failed: " + t1_ex);
|
981 |
| } |
982 |
2
| if (t2_ex != null)
|
983 |
| { |
984 |
0
| fail("Thread2 failed: " + t2_ex);
|
985 |
| } |
986 |
| } |
987 |
| |
988 |
| |
989 |
| |
990 |
| |
991 |
| |
992 |
| |
993 |
| |
994 |
| |
995 |
| |
996 |
| |
997 |
2
| public void testPutTx() throws Exception
|
998 |
| { |
999 |
2
| Transaction tx = null;
|
1000 |
| |
1001 |
2
| try
|
1002 |
| { |
1003 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
1004 |
2
| cache1.getConfiguration().setSyncCommitPhase(true);
|
1005 |
2
| cache2.getConfiguration().setSyncCommitPhase(true);
|
1006 |
2
| tx = beginTransaction();
|
1007 |
2
| cache1.put("/a/b/c", "age", 38);
|
1008 |
2
| cache1.put("/a/b/c", "age", 39);
|
1009 |
2
| Object val = cache2.get("/a/b/c", "age");
|
1010 |
2
| assertNull(val);
|
1011 |
2
| tx.commit();
|
1012 |
| |
1013 |
0
| tx = beginTransaction();
|
1014 |
0
| assertEquals(39, cache2.get("/a/b/c", "age"));
|
1015 |
0
| tx.commit();
|
1016 |
| } |
1017 |
| catch (Throwable t) |
1018 |
| { |
1019 |
2
| t.printStackTrace();
|
1020 |
2
| t1_ex = t;
|
1021 |
| } |
1022 |
| finally |
1023 |
| { |
1024 |
2
| lock.release();
|
1025 |
| } |
1026 |
| } |
1027 |
| |
1028 |
| |
1029 |
| |
1030 |
| |
1031 |
| |
1032 |
| |
1033 |
| |
1034 |
| |
1035 |
2
| public void testPutTx1() throws Exception
|
1036 |
| { |
1037 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
1038 |
2
| final CacheImpl c1 = this.cache1;
|
1039 |
2
| Thread t1 = new Thread()
|
1040 |
| { |
1041 |
2
| public void run()
|
1042 |
| { |
1043 |
2
| Transaction tx = null;
|
1044 |
| |
1045 |
2
| try
|
1046 |
| { |
1047 |
2
| lock.acquire();
|
1048 |
2
| tx = beginTransaction();
|
1049 |
2
| c1.put("/a/b/c", "age", 38);
|
1050 |
2
| c1.put("/a/b/c", "age", 39);
|
1051 |
2
| lock.release();
|
1052 |
| |
1053 |
2
| TestingUtil.sleepThread(300);
|
1054 |
2
| lock.acquire();
|
1055 |
2
| try
|
1056 |
| { |
1057 |
2
| tx.commit();
|
1058 |
| } |
1059 |
| catch (RollbackException ex) |
1060 |
| { |
1061 |
2
| System.out.println("[Thread1] received RollbackException, as expected. Rolling back changes");
|
1062 |
| } |
1063 |
| finally |
1064 |
| { |
1065 |
2
| lock.release();
|
1066 |
| } |
1067 |
| } |
1068 |
| catch (Throwable ex) |
1069 |
| { |
1070 |
0
| ex.printStackTrace();
|
1071 |
0
| t1_ex = ex;
|
1072 |
| } |
1073 |
| finally |
1074 |
| { |
1075 |
2
| lock.release();
|
1076 |
| } |
1077 |
| } |
1078 |
| }; |
1079 |
| |
1080 |
2
| Thread t2 = new Thread()
|
1081 |
| { |
1082 |
2
| public void run()
|
1083 |
| { |
1084 |
2
| Transaction tx = null;
|
1085 |
| |
1086 |
2
| try
|
1087 |
| { |
1088 |
2
| sleep(200);
|
1089 |
2
| Thread.yield();
|
1090 |
2
| lock.acquire();
|
1091 |
2
| tx = beginTransaction();
|
1092 |
2
| assertNull(cache2.get("/a/b/c", "age"));
|
1093 |
2
| cache2.put("/a/b/c", "age", 40);
|
1094 |
2
| lock.release();
|
1095 |
| |
1096 |
2
| TestingUtil.sleepThread(300);
|
1097 |
2
| lock.acquire();
|
1098 |
2
| assertEquals(40, cache2.get("/a/b/c", "age"));
|
1099 |
2
| tx.commit();
|
1100 |
2
| lock.release();
|
1101 |
| |
1102 |
2
| TestingUtil.sleepThread(1000);
|
1103 |
2
| tx = beginTransaction();
|
1104 |
2
| assertEquals("After cache2 commit", 40, cache2.get("/a/b/c", "age"));
|
1105 |
2
| tx.commit();
|
1106 |
| } |
1107 |
| catch (Throwable ex) |
1108 |
| { |
1109 |
0
| ex.printStackTrace();
|
1110 |
0
| t2_ex = ex;
|
1111 |
| } |
1112 |
| finally |
1113 |
| { |
1114 |
2
| lock.release();
|
1115 |
| } |
1116 |
| } |
1117 |
| }; |
1118 |
| |
1119 |
| |
1120 |
2
| t1.start();
|
1121 |
2
| t2.start();
|
1122 |
| |
1123 |
2
| t1.join();
|
1124 |
2
| t2.join();
|
1125 |
| |
1126 |
2
| if (t1_ex != null)
|
1127 |
| { |
1128 |
0
| fail("Thread1 failed: " + t1_ex);
|
1129 |
| } |
1130 |
2
| if (t2_ex != null)
|
1131 |
| { |
1132 |
0
| fail("Thread2 failed: " + t2_ex);
|
1133 |
| } |
1134 |
| } |
1135 |
| |
1136 |
| |
1137 |
2
| public void testPutTxWithRollback() throws Exception
|
1138 |
| { |
1139 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
1140 |
2
| final CacheImpl c2 = this.cache1;
|
1141 |
2
| Thread t1 = new Thread()
|
1142 |
| { |
1143 |
2
| public void run()
|
1144 |
| { |
1145 |
2
| Transaction tx = null;
|
1146 |
| |
1147 |
2
| try
|
1148 |
| { |
1149 |
2
| lock.acquire();
|
1150 |
2
| tx = beginTransaction();
|
1151 |
2
| c2.put("/a/b/c", "age", 38);
|
1152 |
2
| c2.put("/a/b/c", "age", 39);
|
1153 |
2
| lock.release();
|
1154 |
| |
1155 |
2
| TestingUtil.sleepThread(100);
|
1156 |
2
| lock.acquire();
|
1157 |
2
| tx.rollback();
|
1158 |
2
| lock.release();
|
1159 |
| } |
1160 |
| catch (Throwable ex) |
1161 |
| { |
1162 |
0
| ex.printStackTrace();
|
1163 |
0
| t1_ex = ex;
|
1164 |
| } |
1165 |
| finally |
1166 |
| { |
1167 |
2
| lock.release();
|
1168 |
| } |
1169 |
| } |
1170 |
| }; |
1171 |
| |
1172 |
2
| Thread t2 = new Thread()
|
1173 |
| { |
1174 |
2
| public void run()
|
1175 |
| { |
1176 |
2
| Transaction tx = null;
|
1177 |
| |
1178 |
2
| try
|
1179 |
| { |
1180 |
2
| sleep(200);
|
1181 |
2
| Thread.yield();
|
1182 |
2
| lock.acquire();
|
1183 |
2
| tx = beginTransaction();
|
1184 |
2
| assertNull(cache2.get("/a/b/c", "age"));
|
1185 |
2
| lock.release();
|
1186 |
| |
1187 |
2
| TestingUtil.sleepThread(100);
|
1188 |
2
| lock.acquire();
|
1189 |
2
| assertNull(cache2.get("/a/b/c", "age"));
|
1190 |
2
| tx.commit();
|
1191 |
2
| lock.release();
|
1192 |
| } |
1193 |
| catch (Throwable ex) |
1194 |
| { |
1195 |
0
| ex.printStackTrace();
|
1196 |
0
| t2_ex = ex;
|
1197 |
| } |
1198 |
| finally |
1199 |
| { |
1200 |
2
| lock.release();
|
1201 |
| } |
1202 |
| } |
1203 |
| }; |
1204 |
| |
1205 |
| |
1206 |
2
| t1.start();
|
1207 |
2
| t2.start();
|
1208 |
| |
1209 |
| |
1210 |
2
| t1.join();
|
1211 |
2
| t2.join();
|
1212 |
2
| if (t1_ex != null)
|
1213 |
| { |
1214 |
0
| fail("Thread1 failed: " + t1_ex);
|
1215 |
| } |
1216 |
2
| if (t2_ex != null)
|
1217 |
| { |
1218 |
0
| fail("Thread2 failed: " + t2_ex);
|
1219 |
| } |
1220 |
| } |
1221 |
| |
1222 |
| |
1223 |
| static class TransactionAborter implements Synchronization |
1224 |
| { |
1225 |
| Transaction ltx = null; |
1226 |
| |
1227 |
6
| public TransactionAborter(Transaction ltx)
|
1228 |
| { |
1229 |
6
| this.ltx = ltx;
|
1230 |
| } |
1231 |
| |
1232 |
6
| public void beforeCompletion()
|
1233 |
| { |
1234 |
6
| try
|
1235 |
| { |
1236 |
6
| ltx.setRollbackOnly();
|
1237 |
| } |
1238 |
| catch (SystemException e) |
1239 |
| { |
1240 |
| |
1241 |
| } |
1242 |
| } |
1243 |
| |
1244 |
6
| public void afterCompletion(int status)
|
1245 |
| { |
1246 |
| } |
1247 |
| } |
1248 |
| |
1249 |
| static class CallbackListener extends AbstractCacheListener |
1250 |
| { |
1251 |
| |
1252 |
| CacheImpl callbackCache; |
1253 |
| Object callbackKey; |
1254 |
| Exception ex; |
1255 |
| Object mutex = new Object(); |
1256 |
| |
1257 |
6
| CallbackListener(CacheImpl cache, Object callbackKey)
|
1258 |
| { |
1259 |
6
| this.callbackCache = cache;
|
1260 |
6
| this.callbackKey = callbackKey;
|
1261 |
6
| cache.getNotifier().addCacheListener(this);
|
1262 |
| } |
1263 |
| |
1264 |
22
| public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map data)
|
1265 |
| { |
1266 |
22
| if (!pre)
|
1267 |
| { |
1268 |
| |
1269 |
| |
1270 |
12
| synchronized (mutex)
|
1271 |
| { |
1272 |
12
| try
|
1273 |
| { |
1274 |
12
| callbackCache.get(fqn, callbackKey);
|
1275 |
| } |
1276 |
| catch (CacheException e) |
1277 |
| { |
1278 |
0
| e.printStackTrace();
|
1279 |
0
| ex = e;
|
1280 |
| } |
1281 |
| } |
1282 |
| } |
1283 |
| } |
1284 |
| |
1285 |
8
| Exception getCallbackException()
|
1286 |
| { |
1287 |
8
| synchronized (mutex)
|
1288 |
| { |
1289 |
8
| return ex;
|
1290 |
| } |
1291 |
| } |
1292 |
| |
1293 |
| } |
1294 |
| |
1295 |
| static class TransactionAborterCallbackListener extends CallbackListener |
1296 |
| { |
1297 |
| |
1298 |
| TransactionManager callbackTM; |
1299 |
| |
1300 |
2
| TransactionAborterCallbackListener(CacheImpl cache, Object callbackKey)
|
1301 |
| { |
1302 |
2
| super(cache, callbackKey);
|
1303 |
2
| callbackTM = callbackCache.getTransactionManager();
|
1304 |
| } |
1305 |
| |
1306 |
8
| public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map data)
|
1307 |
| { |
1308 |
8
| if (!pre)
|
1309 |
| { |
1310 |
4
| try
|
1311 |
| { |
1312 |
4
| Transaction tx = callbackTM.getTransaction();
|
1313 |
4
| if (tx != null && tx.getStatus() == Status.STATUS_ACTIVE)
|
1314 |
| { |
1315 |
| |
1316 |
2
| tx.registerSynchronization(new TransactionAborter(tx));
|
1317 |
| } |
1318 |
| else |
1319 |
| { |
1320 |
2
| super.nodeModified(fqn, pre, isLocal, modType, data);
|
1321 |
| } |
1322 |
| |
1323 |
| } |
1324 |
| catch (Exception e) |
1325 |
| { |
1326 |
0
| e.printStackTrace();
|
1327 |
0
| if (ex == null)
|
1328 |
| { |
1329 |
0
| ex = e;
|
1330 |
| } |
1331 |
| } |
1332 |
| } |
1333 |
| } |
1334 |
| |
1335 |
| } |
1336 |
| |
1337 |
| static class TransactionAborterListener extends AbstractCacheListener |
1338 |
| { |
1339 |
| |
1340 |
| TransactionManager callbackTM; |
1341 |
| Object mutex = new Object(); |
1342 |
| Exception ex; |
1343 |
| |
1344 |
2
| TransactionAborterListener(CacheImpl cache)
|
1345 |
| { |
1346 |
2
| callbackTM = cache.getTransactionManager();
|
1347 |
2
| cache.getNotifier().addCacheListener(this);
|
1348 |
| } |
1349 |
| |
1350 |
8
| public void nodeModified(Fqn fqn, boolean pre, boolean isLocal, ModificationType modType, Map data)
|
1351 |
| { |
1352 |
8
| if (!pre)
|
1353 |
| { |
1354 |
4
| synchronized (mutex)
|
1355 |
| { |
1356 |
4
| try
|
1357 |
| { |
1358 |
4
| Transaction tx = callbackTM.getTransaction();
|
1359 |
4
| if (tx != null && tx.getStatus() == Status.STATUS_ACTIVE)
|
1360 |
| { |
1361 |
| |
1362 |
2
| tx.setRollbackOnly();
|
1363 |
| } |
1364 |
| } |
1365 |
| catch (Exception e) |
1366 |
| { |
1367 |
0
| e.printStackTrace();
|
1368 |
0
| if (ex == null)
|
1369 |
| { |
1370 |
0
| ex = e;
|
1371 |
| } |
1372 |
| } |
1373 |
| } |
1374 |
| } |
1375 |
| } |
1376 |
| |
1377 |
2
| Exception getCallbackException()
|
1378 |
| { |
1379 |
2
| synchronized (mutex)
|
1380 |
| { |
1381 |
2
| return ex;
|
1382 |
| } |
1383 |
| } |
1384 |
| |
1385 |
| } |
1386 |
| |
1387 |
2
| public static Test suite()
|
1388 |
| { |
1389 |
| |
1390 |
2
| return new TestSuite(SyncReplTxTest.class);
|
1391 |
| } |
1392 |
| |
1393 |
| |
1394 |
| } |