1 |
| |
2 |
| |
3 |
| |
4 |
| |
5 |
| |
6 |
| |
7 |
| |
8 |
| package org.jboss.cache.replicated; |
9 |
| |
10 |
| import junit.framework.Test; |
11 |
| import junit.framework.TestCase; |
12 |
| import junit.framework.TestSuite; |
13 |
| import org.apache.commons.logging.Log; |
14 |
| import org.apache.commons.logging.LogFactory; |
15 |
| import org.jboss.cache.Cache; |
16 |
| import org.jboss.cache.CacheException; |
17 |
| import org.jboss.cache.CacheImpl; |
18 |
| import org.jboss.cache.DefaultCacheFactory; |
19 |
| import org.jboss.cache.Fqn; |
20 |
| import org.jboss.cache.config.Configuration; |
21 |
| import org.jboss.cache.lock.IsolationLevel; |
22 |
| import org.jboss.cache.lock.TimeoutException; |
23 |
| import org.jboss.cache.misc.TestingUtil; |
24 |
| import org.jboss.cache.notifications.annotation.CacheListener; |
25 |
| import org.jboss.cache.notifications.annotation.NodeModified; |
26 |
| import org.jboss.cache.notifications.event.NodeEvent; |
27 |
| import org.jboss.cache.transaction.DummyTransactionManager; |
28 |
| |
29 |
| import javax.naming.Context; |
30 |
| import javax.transaction.NotSupportedException; |
31 |
| import javax.transaction.RollbackException; |
32 |
| import javax.transaction.Status; |
33 |
| import javax.transaction.Synchronization; |
34 |
| import javax.transaction.SystemException; |
35 |
| import javax.transaction.Transaction; |
36 |
| import javax.transaction.TransactionManager; |
37 |
| import java.util.ArrayList; |
38 |
| import java.util.List; |
39 |
| import java.util.concurrent.Semaphore; |
40 |
| |
41 |
| |
42 |
| |
43 |
| |
44 |
| |
45 |
| |
46 |
| |
47 |
| |
48 |
| public class SyncReplTxTest extends TestCase |
49 |
| { |
50 |
| private static Log log = LogFactory.getLog(SyncReplTxTest.class); |
51 |
| private CacheImpl cache1; |
52 |
| private CacheImpl cache2; |
53 |
| |
54 |
| private String old_factory = null; |
55 |
| private final String FACTORY = "org.jboss.cache.transaction.DummyContextFactory"; |
56 |
| private Semaphore lock = new Semaphore(1); |
57 |
| private Throwable t1_ex; |
58 |
| private Throwable t2_ex; |
59 |
| |
60 |
| |
61 |
34
| public SyncReplTxTest(String name)
|
62 |
| { |
63 |
34
| super(name);
|
64 |
| } |
65 |
| |
66 |
34
| public void setUp() throws Exception
|
67 |
| { |
68 |
34
| super.setUp();
|
69 |
34
| old_factory = System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
|
70 |
34
| System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY);
|
71 |
34
| t1_ex = t2_ex = null;
|
72 |
| } |
73 |
| |
74 |
34
| public void tearDown() throws Exception
|
75 |
| { |
76 |
34
| super.tearDown();
|
77 |
34
| DummyTransactionManager.destroy();
|
78 |
34
| destroyCaches();
|
79 |
34
| if (old_factory != null)
|
80 |
| { |
81 |
32
| System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
|
82 |
32
| old_factory = null;
|
83 |
| } |
84 |
| } |
85 |
| |
86 |
48
| private Transaction beginTransaction() throws SystemException, NotSupportedException
|
87 |
| { |
88 |
48
| DummyTransactionManager mgr = DummyTransactionManager.getInstance();
|
89 |
48
| mgr.begin();
|
90 |
48
| return mgr.getTransaction();
|
91 |
| } |
92 |
| |
93 |
30
| private void initCaches(Configuration.CacheMode caching_mode) throws Exception
|
94 |
| { |
95 |
30
| cache1 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
|
96 |
30
| cache2 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
|
97 |
30
| cache1.getConfiguration().setCacheMode(caching_mode);
|
98 |
30
| cache2.getConfiguration().setCacheMode(caching_mode);
|
99 |
30
| cache1.getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE);
|
100 |
30
| cache2.getConfiguration().setIsolationLevel(IsolationLevel.SERIALIZABLE);
|
101 |
| |
102 |
30
| cache1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
|
103 |
30
| cache2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
|
104 |
30
| cache1.getConfiguration().setLockAcquisitionTimeout(5000);
|
105 |
30
| cache2.getConfiguration().setLockAcquisitionTimeout(5000);
|
106 |
| |
107 |
30
| configureMultiplexer(cache1);
|
108 |
30
| configureMultiplexer(cache2);
|
109 |
| |
110 |
30
| cache1.start();
|
111 |
30
| cache2.start();
|
112 |
| |
113 |
30
| validateMultiplexer(cache1);
|
114 |
30
| validateMultiplexer(cache2);
|
115 |
| } |
116 |
| |
117 |
| |
118 |
| |
119 |
| |
120 |
| |
121 |
| |
122 |
| |
123 |
| |
124 |
60
| protected void configureMultiplexer(Cache cache) throws Exception
|
125 |
| { |
126 |
| |
127 |
| } |
128 |
| |
129 |
| |
130 |
| |
131 |
| |
132 |
| |
133 |
| |
134 |
| |
135 |
| |
136 |
60
| protected void validateMultiplexer(Cache cache)
|
137 |
| { |
138 |
60
| assertFalse("Cache is not using multiplexer", cache.getConfiguration().isUsingMultiplexer());
|
139 |
| } |
140 |
| |
141 |
34
| private void destroyCaches()
|
142 |
| { |
143 |
34
| if (cache1 != null)
|
144 |
| { |
145 |
30
| cache1.stop();
|
146 |
| } |
147 |
34
| if (cache2 != null)
|
148 |
| { |
149 |
30
| cache2.stop();
|
150 |
| } |
151 |
34
| cache1 = null;
|
152 |
34
| cache2 = null;
|
153 |
| } |
154 |
| |
155 |
2
| public void testLockRemoval() throws Exception
|
156 |
| { |
157 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
158 |
2
| cache1.getConfiguration().setSyncCommitPhase(true);
|
159 |
2
| cache1.releaseAllLocks("/");
|
160 |
2
| Transaction tx = beginTransaction();
|
161 |
2
| cache1.put("/bela/ban", "name", "Bela Ban");
|
162 |
2
| assertEquals(3, cache1.getNumberOfLocksHeld());
|
163 |
2
| assertEquals(0, cache2.getNumberOfLocksHeld());
|
164 |
2
| tx.commit();
|
165 |
2
| assertEquals(0, cache1.getNumberOfLocksHeld());
|
166 |
2
| assertEquals(0, cache2.getNumberOfLocksHeld());
|
167 |
| } |
168 |
| |
169 |
| |
170 |
2
| public void testSyncRepl() throws Exception
|
171 |
| { |
172 |
2
| Integer age;
|
173 |
2
| Transaction tx;
|
174 |
| |
175 |
2
| try
|
176 |
| { |
177 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
178 |
2
| cache1.getConfiguration().setSyncCommitPhase(true);
|
179 |
2
| cache2.getConfiguration().setSyncCommitPhase(true);
|
180 |
| |
181 |
| |
182 |
| |
183 |
2
| tx = beginTransaction();
|
184 |
2
| cache1.put("/a/b/c", "age", 38);
|
185 |
2
| TransactionManager mgr = cache1.getTransactionManager();
|
186 |
2
| tx = mgr.suspend();
|
187 |
2
| assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
|
188 |
2
| log.debug("cache1: locks held before commit: " + cache1.printLockInfo());
|
189 |
2
| log.debug("cache2: locks held before commit: " + cache2.printLockInfo());
|
190 |
2
| mgr.resume(tx);
|
191 |
2
| tx.commit();
|
192 |
2
| log.debug("cache1: locks held after commit: " + cache1.printLockInfo());
|
193 |
2
| log.debug("cache2: locks held after commit: " + cache2.printLockInfo());
|
194 |
| |
195 |
| |
196 |
2
| age = (Integer) cache2.get("/a/b/c", "age");
|
197 |
2
| assertNotNull("\"age\" obtained from cache2 must be non-null ", age);
|
198 |
2
| assertTrue("\"age\" must be 38", age == 38);
|
199 |
| } |
200 |
| catch (Exception e) |
201 |
| { |
202 |
0
| fail(e.toString());
|
203 |
| } |
204 |
| } |
205 |
| |
206 |
| |
207 |
| |
208 |
| |
209 |
2
| public void testSimplePut() throws Exception
|
210 |
| { |
211 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
212 |
| |
213 |
2
| cache1.put("/JSESSION/localhost/192.168.1.10:32882/Courses/0", "Instructor", "Ben Wang");
|
214 |
| |
215 |
2
| cache1.put("/JSESSION/localhost/192.168.1.10:32882/1", "Number", 10);
|
216 |
| } |
217 |
| |
218 |
| |
219 |
2
| public void testSimpleTxPut() throws Exception
|
220 |
| { |
221 |
2
| Transaction tx;
|
222 |
2
| final Fqn NODE1 = Fqn.fromString("/one/two/three");
|
223 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
224 |
| |
225 |
2
| tx = beginTransaction();
|
226 |
2
| cache1.put(NODE1, "age", 38);
|
227 |
2
| System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
|
228 |
2
| tx.commit();
|
229 |
| |
230 |
| |
231 |
| |
232 |
| |
233 |
| |
234 |
| |
235 |
| |
236 |
| |
237 |
| |
238 |
| |
239 |
| |
240 |
| |
241 |
| |
242 |
| |
243 |
| |
244 |
| |
245 |
| |
246 |
| |
247 |
| |
248 |
| |
249 |
| } |
250 |
| |
251 |
2
| public void testSyncReplWithModficationsOnBothCaches() throws Exception
|
252 |
| { |
253 |
2
| Integer age;
|
254 |
2
| Transaction tx;
|
255 |
2
| final Fqn NODE1 = Fqn.fromString("/one/two/three");
|
256 |
2
| final Fqn NODE2 = Fqn.fromString("/eins/zwei/drei");
|
257 |
| |
258 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
259 |
| |
260 |
| |
261 |
2
| cache1.put("/one/two", null);
|
262 |
2
| cache2.put("/eins/zwei", null);
|
263 |
| |
264 |
2
| cache1.getConfiguration().setSyncCommitPhase(true);
|
265 |
2
| cache2.getConfiguration().setSyncCommitPhase(true);
|
266 |
| |
267 |
2
| tx = beginTransaction();
|
268 |
2
| cache1.put(NODE1, "age", 38);
|
269 |
2
| System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
|
270 |
| |
271 |
2
| cache2.put(NODE2, "age", 39);
|
272 |
2
| System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true));
|
273 |
| |
274 |
2
| System.out.println("cache1 before commit:\n" + cache1.printLockInfo());
|
275 |
2
| System.out.println("cache2 before commit:\n" + cache2.printLockInfo());
|
276 |
| |
277 |
2
| try
|
278 |
| { |
279 |
2
| tx.commit();
|
280 |
0
| fail("Should not succeed with SERIALIZABLE semantics");
|
281 |
| } |
282 |
| catch (Exception e) |
283 |
| { |
284 |
| |
285 |
| } |
286 |
| |
287 |
2
| System.out.println("cache1 after commit:\n" + cache1.printLockInfo());
|
288 |
2
| System.out.println("cache2 after commit:\n" + cache2.printLockInfo());
|
289 |
| |
290 |
| |
291 |
| |
292 |
| |
293 |
| |
294 |
| |
295 |
| |
296 |
| |
297 |
| |
298 |
| |
299 |
| |
300 |
| |
301 |
| |
302 |
| |
303 |
| |
304 |
| |
305 |
| |
306 |
| |
307 |
| |
308 |
| |
309 |
| |
310 |
| |
311 |
| |
312 |
| |
313 |
2
| assertEquals(0, cache1.getNumberOfLocksHeld());
|
314 |
2
| assertEquals(0, cache2.getNumberOfLocksHeld());
|
315 |
2
| System.out.println("TransactionTable for cache1:\n" + cache1.getTransactionTable().toString(true));
|
316 |
2
| System.out.println("TransactionTable for cache2:\n" + cache2.getTransactionTable().toString(true));
|
317 |
| } |
318 |
| |
319 |
2
| public void testSyncReplWithModficationsOnBothCachesSameData() throws Exception
|
320 |
| { |
321 |
2
| Transaction tx;
|
322 |
2
| final Fqn NODE = Fqn.fromString("/one/two/three");
|
323 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
324 |
2
| tx = beginTransaction();
|
325 |
2
| cache1.put(NODE, "age", 38);
|
326 |
2
| System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
|
327 |
| |
328 |
2
| cache2.put(NODE, "age", 39);
|
329 |
2
| System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true));
|
330 |
| |
331 |
2
| System.out.println("cache1 before commit:\n" + cache1.printLockInfo());
|
332 |
2
| System.out.println("cache2 before commit:\n" + cache2.printLockInfo());
|
333 |
| |
334 |
2
| try
|
335 |
| { |
336 |
2
| tx.commit();
|
337 |
0
| fail("commit should throw a RollbackException, we should not get here");
|
338 |
| } |
339 |
| catch (RollbackException rollback) |
340 |
| { |
341 |
2
| System.out.println("Transaction was rolled back, this is correct");
|
342 |
| } |
343 |
| |
344 |
2
| System.out.println("cache1 after commit:\n" + cache1.printLockInfo());
|
345 |
2
| System.out.println("cache2 after commit:\n" + cache2.printLockInfo());
|
346 |
| |
347 |
2
| assertEquals(0, cache1.getNumberOfLocksHeld());
|
348 |
2
| assertEquals(0, cache2.getNumberOfLocksHeld());
|
349 |
| |
350 |
2
| assertEquals(0, cache1.getNumberOfNodes());
|
351 |
2
| assertEquals(0, cache2.getNumberOfNodes());
|
352 |
| } |
353 |
| |
354 |
| |
355 |
2
| public void testSyncReplWithModficationsOnBothCachesWithRollback() throws Exception
|
356 |
| { |
357 |
2
| Transaction tx;
|
358 |
2
| final Fqn NODE1 = Fqn.fromString("/one/two/three");
|
359 |
2
| final Fqn NODE2 = Fqn.fromString("/eins/zwei/drei");
|
360 |
| |
361 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
362 |
| |
363 |
2
| cache1.getConfiguration().setSyncRollbackPhase(true);
|
364 |
2
| cache2.getConfiguration().setSyncRollbackPhase(true);
|
365 |
| |
366 |
2
| tx = beginTransaction();
|
367 |
2
| cache1.put(NODE1, "age", 38);
|
368 |
2
| cache2.put(NODE2, "age", 39);
|
369 |
| |
370 |
2
| System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
|
371 |
2
| System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
|
372 |
| |
373 |
| |
374 |
2
| tx.registerSynchronization(new TransactionAborter(tx));
|
375 |
| |
376 |
2
| try
|
377 |
| { |
378 |
2
| tx.commit();
|
379 |
0
| fail("commit should throw a RollbackException, we should not get here");
|
380 |
| } |
381 |
| catch (RollbackException rollback) |
382 |
| { |
383 |
2
| System.out.println("Transaction was rolled back, this is correct");
|
384 |
| } |
385 |
| |
386 |
2
| System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
|
387 |
2
| System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
|
388 |
| |
389 |
2
| assertEquals(0, cache1.getNumberOfLocksHeld());
|
390 |
2
| assertEquals(0, cache2.getNumberOfLocksHeld());
|
391 |
| |
392 |
2
| assertEquals(0, cache1.getNumberOfNodes());
|
393 |
2
| assertEquals(0, cache2.getNumberOfNodes());
|
394 |
| } |
395 |
| |
396 |
| |
397 |
| |
398 |
| |
399 |
| |
400 |
| |
401 |
| |
402 |
| |
403 |
| |
404 |
| |
405 |
| |
406 |
| |
407 |
| |
408 |
| |
409 |
| |
410 |
| |
411 |
| |
412 |
| |
413 |
| |
414 |
| |
415 |
| |
416 |
| |
417 |
| |
418 |
| |
419 |
| |
420 |
| |
421 |
| |
422 |
| |
423 |
| |
424 |
| |
425 |
| |
426 |
| |
427 |
| |
428 |
| |
429 |
| |
430 |
| |
431 |
| |
432 |
| |
433 |
| |
434 |
| |
435 |
| |
436 |
| |
437 |
| |
438 |
| |
439 |
| |
440 |
| |
441 |
| |
442 |
| |
443 |
| |
444 |
| |
445 |
| |
446 |
| |
447 |
| |
448 |
| |
449 |
| |
450 |
| |
451 |
| |
452 |
| |
453 |
| |
454 |
| |
455 |
| |
456 |
| |
457 |
| |
458 |
| |
459 |
| |
460 |
| |
461 |
| |
462 |
| |
463 |
| |
464 |
| |
465 |
| |
466 |
| |
467 |
| |
468 |
| |
469 |
| |
470 |
| |
471 |
| |
472 |
| |
473 |
| |
474 |
| |
475 |
| |
476 |
| |
477 |
| |
478 |
| |
479 |
| |
480 |
| |
481 |
| |
482 |
| |
483 |
| |
484 |
| |
485 |
| |
486 |
| |
487 |
| |
488 |
| |
489 |
| |
490 |
| |
491 |
| |
492 |
| |
493 |
| |
494 |
2
| public void testSyncReplWithRemoteRollback() throws Exception
|
495 |
| { |
496 |
2
| Transaction tx;
|
497 |
2
| final Fqn NODE1 = Fqn.fromString("/one/two/three");
|
498 |
| |
499 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
500 |
| |
501 |
2
| cache1.getConfiguration().setSyncRollbackPhase(true);
|
502 |
2
| cache2.getConfiguration().setSyncRollbackPhase(true);
|
503 |
| |
504 |
| |
505 |
| |
506 |
| |
507 |
| |
508 |
| |
509 |
2
| tx = beginTransaction();
|
510 |
2
| cache1.put(NODE1, "age", 38);
|
511 |
| |
512 |
2
| System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
|
513 |
2
| System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
|
514 |
| |
515 |
| |
516 |
2
| tx = cache1.getTransactionManager().suspend();
|
517 |
| |
518 |
2
| cache2.getTransactionManager().begin();
|
519 |
2
| cache2.getRoot().put("x", "y");
|
520 |
2
| Transaction tx2 = cache2.getTransactionManager().suspend();
|
521 |
| |
522 |
2
| System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());
|
523 |
2
| cache1.getTransactionManager().resume(tx);
|
524 |
| |
525 |
2
| try
|
526 |
| { |
527 |
2
| tx.commit();
|
528 |
0
| fail("commit should throw a RollbackException, we should not get here");
|
529 |
| } |
530 |
| catch (RollbackException rollback) |
531 |
| { |
532 |
2
| System.out.println("Transaction was rolled back, this is correct");
|
533 |
| } |
534 |
| finally |
535 |
| { |
536 |
2
| cache2.getTransactionManager().resume(tx2);
|
537 |
2
| tx2.rollback();
|
538 |
| } |
539 |
| |
540 |
| |
541 |
2
| TestingUtil.sleepThread(1000);
|
542 |
| |
543 |
2
| System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
|
544 |
2
| System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());
|
545 |
| |
546 |
| |
547 |
| |
548 |
2
| assertEquals(0, cache1.getNumberOfLocksHeld());
|
549 |
2
| assertEquals(0, cache2.getNumberOfLocksHeld());
|
550 |
| |
551 |
2
| assertEquals(0, cache1.getNumberOfNodes());
|
552 |
2
| assertEquals(0, cache2.getNumberOfNodes());
|
553 |
| |
554 |
| } |
555 |
| |
556 |
| |
557 |
2
| public void testASyncRepl() throws Exception
|
558 |
| { |
559 |
2
| Integer age;
|
560 |
2
| Transaction tx;
|
561 |
| |
562 |
2
| initCaches(Configuration.CacheMode.REPL_ASYNC);
|
563 |
| |
564 |
2
| tx = beginTransaction();
|
565 |
2
| cache1.put("/a/b/c", "age", 38);
|
566 |
2
| Thread.sleep(1000);
|
567 |
2
| assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
|
568 |
2
| tx.commit();
|
569 |
2
| Thread.sleep(1000);
|
570 |
| |
571 |
| |
572 |
2
| age = (Integer) cache2.get("/a/b/c", "age");
|
573 |
2
| assertNotNull("\"age\" obtained from cache2 is null ", age);
|
574 |
2
| assertTrue("\"age\" must be 38", age == 38);
|
575 |
| |
576 |
| } |
577 |
| |
578 |
| |
579 |
| |
580 |
| |
581 |
| |
582 |
| |
583 |
| |
584 |
| |
585 |
| |
586 |
| |
587 |
| |
588 |
| |
589 |
| |
590 |
| |
591 |
| |
592 |
| |
593 |
| |
594 |
| |
595 |
| |
596 |
| |
597 |
| |
598 |
| |
599 |
| |
600 |
| |
601 |
| |
602 |
| |
603 |
| |
604 |
| |
605 |
| |
606 |
| |
607 |
| |
608 |
| |
609 |
| |
610 |
| |
611 |
| |
612 |
| |
613 |
| |
614 |
2
| public void testConcurrentPuts() throws Exception
|
615 |
| { |
616 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
617 |
2
| cache1.getConfiguration().setSyncCommitPhase(true);
|
618 |
| |
619 |
2
| Thread t1 = new Thread("Thread1")
|
620 |
| { |
621 |
| Transaction tx; |
622 |
| |
623 |
2
| public void run()
|
624 |
| { |
625 |
2
| try
|
626 |
| { |
627 |
2
| tx = beginTransaction();
|
628 |
2
| cache1.put("/bela/ban", "name", "Bela Ban");
|
629 |
2
| TestingUtil.sleepThread(2000);
|
630 |
2
| tx.commit();
|
631 |
2
| System.out.println("[Thread1] ** LOCK INFO cache1: " + cache1.printLockInfo());
|
632 |
2
| System.out.println("[Thread1] ** LOCK INFO cache2: " + cache2.printLockInfo());
|
633 |
| } |
634 |
| catch (Throwable ex) |
635 |
| { |
636 |
0
| ex.printStackTrace();
|
637 |
0
| t1_ex = ex;
|
638 |
| } |
639 |
| } |
640 |
| }; |
641 |
| |
642 |
2
| Thread t2 = new Thread("Thread2")
|
643 |
| { |
644 |
| Transaction tx; |
645 |
| |
646 |
2
| public void run()
|
647 |
| { |
648 |
2
| try
|
649 |
| { |
650 |
2
| TestingUtil.sleepThread(1000);
|
651 |
2
| tx = beginTransaction();
|
652 |
2
| System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
|
653 |
2
| System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
|
654 |
2
| cache1.put("/bela/ban", "name", "Michelle Ban");
|
655 |
2
| System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
|
656 |
2
| System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
|
657 |
2
| tx.commit();
|
658 |
2
| System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
|
659 |
2
| System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
|
660 |
| } |
661 |
| catch (Throwable ex) |
662 |
| { |
663 |
0
| ex.printStackTrace();
|
664 |
0
| t2_ex = ex;
|
665 |
| } |
666 |
| } |
667 |
| }; |
668 |
| |
669 |
| |
670 |
2
| t1.start();
|
671 |
2
| t2.start();
|
672 |
| |
673 |
| |
674 |
2
| t1.join();
|
675 |
2
| t2.join();
|
676 |
| |
677 |
2
| if (t1_ex != null)
|
678 |
| { |
679 |
0
| fail("Thread1 failed: " + t1_ex);
|
680 |
| } |
681 |
2
| if (t2_ex != null)
|
682 |
| { |
683 |
0
| fail("Thread2 failed: " + t2_ex);
|
684 |
| } |
685 |
| |
686 |
2
| assertEquals("Michelle Ban", cache1.get("/bela/ban", "name"));
|
687 |
| } |
688 |
| |
689 |
| |
690 |
| |
691 |
| |
692 |
| |
693 |
2
| public void testConcurrentCommitsWith1Thread() throws Exception
|
694 |
| { |
695 |
2
| _testConcurrentCommits(1);
|
696 |
| } |
697 |
| |
698 |
| |
699 |
| |
700 |
| |
701 |
2
| public void testConcurrentCommitsWith5Threads() throws Exception
|
702 |
| { |
703 |
2
| _testConcurrentCommits(5);
|
704 |
| } |
705 |
| |
706 |
| |
707 |
| |
708 |
| |
709 |
4
| private void _testConcurrentCommits(int num_threads) throws Exception
|
710 |
| { |
711 |
4
| Object myMutex = new Object();
|
712 |
| |
713 |
4
| final CacheImpl c1 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
|
714 |
4
| final CacheImpl c2 = (CacheImpl) DefaultCacheFactory.getInstance().createCache(false);
|
715 |
4
| c1.getConfiguration().setClusterName("TempCluster");
|
716 |
4
| c2.getConfiguration().setClusterName("TempCluster");
|
717 |
4
| c1.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
|
718 |
4
| c2.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
|
719 |
4
| c1.getConfiguration().setSyncCommitPhase(true);
|
720 |
4
| c2.getConfiguration().setSyncCommitPhase(true);
|
721 |
4
| c1.getConfiguration().setSyncRollbackPhase(true);
|
722 |
4
| c2.getConfiguration().setSyncRollbackPhase(true);
|
723 |
4
| c1.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ);
|
724 |
4
| c2.getConfiguration().setIsolationLevel(IsolationLevel.REPEATABLE_READ);
|
725 |
4
| c1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
|
726 |
4
| c2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
|
727 |
4
| c1.getConfiguration().setLockAcquisitionTimeout(5000);
|
728 |
4
| c2.getConfiguration().setLockAcquisitionTimeout(5000);
|
729 |
4
| c1.start();
|
730 |
4
| c2.start();
|
731 |
4
| final List<Exception> exceptions = new ArrayList<Exception>();
|
732 |
| |
733 |
| class MyThread extends Thread |
734 |
| { |
735 |
| Object mutex; |
736 |
| |
737 |
12
| public MyThread(String name, Object mutex)
|
738 |
| { |
739 |
12
| super(name);
|
740 |
12
| this.mutex = mutex;
|
741 |
| } |
742 |
| |
743 |
12
| public void run()
|
744 |
| { |
745 |
12
| Transaction tx = null;
|
746 |
| |
747 |
12
| try
|
748 |
| { |
749 |
12
| tx = beginTransaction();
|
750 |
12
| c1.put("/thread/" + getName(), null);
|
751 |
12
| System.out.println("Thread " + getName() + " after put(): " + c1.toString());
|
752 |
12
| System.out.println("Thread " + getName() + " waiting on mutex");
|
753 |
12
| synchronized (mutex)
|
754 |
| { |
755 |
12
| mutex.wait();
|
756 |
| } |
757 |
12
| System.out.println("Thread " + getName() + " committing");
|
758 |
12
| tx.commit();
|
759 |
12
| System.out.println("Thread " + getName() + " committed successfully");
|
760 |
| } |
761 |
| catch (Exception e) |
762 |
| { |
763 |
0
| exceptions.add(e);
|
764 |
| } |
765 |
| finally |
766 |
| { |
767 |
12
| try
|
768 |
| { |
769 |
12
| if (tx != null) tx.rollback();
|
770 |
| } |
771 |
| catch (Exception e) |
772 |
| { |
773 |
| } |
774 |
| } |
775 |
| } |
776 |
| } |
777 |
| |
778 |
4
| MyThread[] threads = new MyThread[num_threads];
|
779 |
4
| for (int i = 0; i < threads.length; i++)
|
780 |
| { |
781 |
12
| threads[i] = new MyThread("#" + i, myMutex);
|
782 |
| } |
783 |
4
| for (int i = 0; i < threads.length; i++)
|
784 |
| { |
785 |
12
| MyThread thread = threads[i];
|
786 |
12
| System.out.println("starting thread #" + i);
|
787 |
12
| thread.start();
|
788 |
| } |
789 |
| |
790 |
4
| TestingUtil.sleepThread(6000);
|
791 |
4
| synchronized (myMutex)
|
792 |
| { |
793 |
4
| System.out.println("cache is " + c1.printLockInfo());
|
794 |
4
| System.out.println("******************* SIGNALLING THREADS ********************");
|
795 |
4
| myMutex.notifyAll();
|
796 |
| } |
797 |
| |
798 |
4
| for (MyThread thread : threads)
|
799 |
| { |
800 |
12
| try
|
801 |
| { |
802 |
12
| thread.join();
|
803 |
12
| System.out.println("Joined thread " + thread.getName());
|
804 |
| } |
805 |
| catch (InterruptedException e) |
806 |
| { |
807 |
0
| e.printStackTrace();
|
808 |
| } |
809 |
| } |
810 |
| |
811 |
4
| System.out.println("FINAL c1:\n" + c1.printDetails() + "\nlocks:\n" + c1.printLockInfo());
|
812 |
| |
813 |
4
| assertEquals(0, c1.getNumberOfLocksHeld());
|
814 |
4
| assertEquals(0, c2.getNumberOfLocksHeld());
|
815 |
| |
816 |
4
| c1.stop();
|
817 |
4
| c2.stop();
|
818 |
| |
819 |
| |
820 |
| |
821 |
| |
822 |
| |
823 |
| |
824 |
| |
825 |
| |
826 |
| |
827 |
| |
828 |
| |
829 |
0
| for (Exception exception : exceptions) assertEquals(TimeoutException.class, exception.getClass());
|
830 |
| } |
831 |
| |
832 |
| |
833 |
| |
834 |
| |
835 |
| |
836 |
2
| public void testConcurrentPutsOnTwoInstances() throws Exception
|
837 |
| { |
838 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
839 |
2
| final CacheImpl c1 = this.cache1;
|
840 |
2
| final CacheImpl c2 = this.cache2;
|
841 |
| |
842 |
2
| Thread t1 = new Thread()
|
843 |
| { |
844 |
| Transaction tx; |
845 |
| |
846 |
2
| public void run()
|
847 |
| { |
848 |
2
| try
|
849 |
| { |
850 |
2
| tx = beginTransaction();
|
851 |
2
| c1.put("/ben/wang", "name", "Ben Wang");
|
852 |
2
| TestingUtil.sleepThread(8000);
|
853 |
2
| tx.commit();
|
854 |
| } |
855 |
| catch (Throwable ex) |
856 |
| { |
857 |
0
| ex.printStackTrace();
|
858 |
0
| t1_ex = ex;
|
859 |
| } |
860 |
| } |
861 |
| }; |
862 |
| |
863 |
2
| Thread t2 = new Thread()
|
864 |
| { |
865 |
| Transaction tx; |
866 |
| |
867 |
2
| public void run()
|
868 |
| { |
869 |
2
| try
|
870 |
| { |
871 |
2
| TestingUtil.sleepThread(1000);
|
872 |
2
| tx = beginTransaction();
|
873 |
2
| c2.put("/ben/wang", "name", "Ben Jr.");
|
874 |
2
| tx.commit();
|
875 |
| } |
876 |
| catch (RollbackException rollback_ex) |
877 |
| { |
878 |
2
| System.out.println("received rollback exception as expected");
|
879 |
| } |
880 |
| catch (Throwable ex) |
881 |
| { |
882 |
0
| ex.printStackTrace();
|
883 |
0
| t2_ex = ex;
|
884 |
| } |
885 |
| } |
886 |
| }; |
887 |
| |
888 |
| |
889 |
2
| t1.start();
|
890 |
2
| t2.start();
|
891 |
| |
892 |
| |
893 |
2
| t1.join();
|
894 |
2
| t2.join();
|
895 |
| |
896 |
2
| if (t1_ex != null)
|
897 |
| { |
898 |
0
| fail("Thread1 failed: " + t1_ex);
|
899 |
| } |
900 |
2
| if (t2_ex != null)
|
901 |
| { |
902 |
0
| fail("Thread2 failed: " + t2_ex);
|
903 |
| } |
904 |
2
| assertEquals("Ben Wang", c1.get("/ben/wang", "name"));
|
905 |
| } |
906 |
| |
907 |
| |
908 |
2
| public void testPut() throws Exception
|
909 |
| { |
910 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
911 |
2
| final CacheImpl c1 = this.cache1;
|
912 |
| |
913 |
| |
914 |
2
| Thread t1 = new Thread()
|
915 |
| { |
916 |
2
| public void run()
|
917 |
| { |
918 |
2
| try
|
919 |
| { |
920 |
2
| lock.acquire();
|
921 |
2
| System.out.println("-- t1 has lock");
|
922 |
2
| c1.put("/a/b/c", "age", 38);
|
923 |
2
| System.out.println("[Thread1] set value to 38");
|
924 |
| |
925 |
2
| System.out.println("-- t1 releases lock");
|
926 |
2
| lock.release();
|
927 |
2
| TestingUtil.sleepThread(300);
|
928 |
2
| Thread.yield();
|
929 |
| |
930 |
2
| lock.acquire();
|
931 |
2
| System.out.println("-- t1 has lock");
|
932 |
2
| c1.put("/a/b/c", "age", 39);
|
933 |
2
| System.out.println("[Thread1] set value to 39");
|
934 |
| |
935 |
2
| System.out.println("-- t1 releases lock");
|
936 |
2
| lock.release();
|
937 |
2
| assertEquals(39, c1.get("/a/b/c", "age"));
|
938 |
| } |
939 |
| catch (Throwable ex) |
940 |
| { |
941 |
0
| ex.printStackTrace();
|
942 |
0
| t1_ex = ex;
|
943 |
| } |
944 |
| finally |
945 |
| { |
946 |
2
| lock.release();
|
947 |
| } |
948 |
| } |
949 |
| }; |
950 |
| |
951 |
2
| Thread t2 = new Thread()
|
952 |
| { |
953 |
2
| public void run()
|
954 |
| { |
955 |
2
| try
|
956 |
| { |
957 |
2
| TestingUtil.sleepThread(100);
|
958 |
2
| Thread.yield();
|
959 |
2
| lock.acquire();
|
960 |
2
| System.out.println("-- t2 has lock");
|
961 |
| |
962 |
2
| Integer val = (Integer) cache2.get("/a/b/c", "age");
|
963 |
2
| System.out.println("[Thread2] value is " + val);
|
964 |
2
| assertEquals(new Integer(38), val);
|
965 |
2
| System.out.println("-- t2 releases lock");
|
966 |
2
| lock.release();
|
967 |
2
| TestingUtil.sleepThread(300);
|
968 |
2
| Thread.yield();
|
969 |
2
| TestingUtil.sleepThread(500);
|
970 |
2
| lock.acquire();
|
971 |
2
| System.out.println("-- t2 has lock");
|
972 |
2
| val = (Integer) cache2.get("/a/b/c", "age");
|
973 |
2
| System.out.println("-- t2 releases lock");
|
974 |
2
| lock.release();
|
975 |
2
| assertEquals(new Integer(39), val);
|
976 |
| } |
977 |
| catch (Throwable ex) |
978 |
| { |
979 |
0
| ex.printStackTrace();
|
980 |
0
| t2_ex = ex;
|
981 |
| } |
982 |
| finally |
983 |
| { |
984 |
2
| lock.release();
|
985 |
| } |
986 |
| } |
987 |
| }; |
988 |
| |
989 |
| |
990 |
2
| t1.start();
|
991 |
2
| t2.start();
|
992 |
| |
993 |
| |
994 |
2
| t1.join();
|
995 |
2
| t2.join();
|
996 |
2
| if (t1_ex != null)
|
997 |
| { |
998 |
0
| fail("Thread1 failed: " + t1_ex);
|
999 |
| } |
1000 |
2
| if (t2_ex != null)
|
1001 |
| { |
1002 |
0
| fail("Thread2 failed: " + t2_ex);
|
1003 |
| } |
1004 |
| } |
1005 |
| |
1006 |
| |
1007 |
| |
1008 |
| |
1009 |
| |
1010 |
| |
1011 |
| |
1012 |
| |
1013 |
| |
1014 |
| |
1015 |
2
| public void testPutTx() throws Exception
|
1016 |
| { |
1017 |
2
| Transaction tx = null;
|
1018 |
| |
1019 |
2
| try
|
1020 |
| { |
1021 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
1022 |
2
| cache1.getConfiguration().setSyncCommitPhase(true);
|
1023 |
2
| cache2.getConfiguration().setSyncCommitPhase(true);
|
1024 |
2
| tx = beginTransaction();
|
1025 |
2
| cache1.put("/a/b/c", "age", 38);
|
1026 |
2
| cache1.put("/a/b/c", "age", 39);
|
1027 |
2
| Object val = cache2.get("/a/b/c", "age");
|
1028 |
2
| assertNull(val);
|
1029 |
2
| tx.commit();
|
1030 |
| |
1031 |
0
| tx = beginTransaction();
|
1032 |
0
| assertEquals(39, cache2.get("/a/b/c", "age"));
|
1033 |
0
| tx.commit();
|
1034 |
| } |
1035 |
| catch (Throwable t) |
1036 |
| { |
1037 |
2
| t.printStackTrace();
|
1038 |
2
| t1_ex = t;
|
1039 |
| } |
1040 |
| finally |
1041 |
| { |
1042 |
2
| lock.release();
|
1043 |
| } |
1044 |
| } |
1045 |
| |
1046 |
| |
1047 |
| |
1048 |
| |
1049 |
| |
1050 |
| |
1051 |
| |
1052 |
| |
1053 |
2
| public void testPutTx1() throws Exception
|
1054 |
| { |
1055 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
1056 |
2
| final CacheImpl c1 = this.cache1;
|
1057 |
2
| Thread t1 = new Thread()
|
1058 |
| { |
1059 |
2
| public void run()
|
1060 |
| { |
1061 |
2
| Transaction tx = null;
|
1062 |
| |
1063 |
2
| try
|
1064 |
| { |
1065 |
2
| lock.acquire();
|
1066 |
2
| tx = beginTransaction();
|
1067 |
2
| c1.put("/a/b/c", "age", 38);
|
1068 |
2
| c1.put("/a/b/c", "age", 39);
|
1069 |
2
| lock.release();
|
1070 |
| |
1071 |
2
| TestingUtil.sleepThread(300);
|
1072 |
2
| lock.acquire();
|
1073 |
2
| try
|
1074 |
| { |
1075 |
2
| tx.commit();
|
1076 |
| } |
1077 |
| catch (RollbackException ex) |
1078 |
| { |
1079 |
2
| System.out.println("[Thread1] received RollbackException, as expected. Rolling back changes");
|
1080 |
| } |
1081 |
| finally |
1082 |
| { |
1083 |
2
| lock.release();
|
1084 |
| } |
1085 |
| } |
1086 |
| catch (Throwable ex) |
1087 |
| { |
1088 |
0
| ex.printStackTrace();
|
1089 |
0
| t1_ex = ex;
|
1090 |
| } |
1091 |
| finally |
1092 |
| { |
1093 |
2
| lock.release();
|
1094 |
| } |
1095 |
| } |
1096 |
| }; |
1097 |
| |
1098 |
2
| Thread t2 = new Thread()
|
1099 |
| { |
1100 |
2
| public void run()
|
1101 |
| { |
1102 |
2
| Transaction tx = null;
|
1103 |
| |
1104 |
2
| try
|
1105 |
| { |
1106 |
2
| sleep(200);
|
1107 |
2
| Thread.yield();
|
1108 |
2
| lock.acquire();
|
1109 |
2
| tx = beginTransaction();
|
1110 |
2
| assertNull(cache2.get("/a/b/c", "age"));
|
1111 |
2
| cache2.put("/a/b/c", "age", 40);
|
1112 |
2
| lock.release();
|
1113 |
| |
1114 |
2
| TestingUtil.sleepThread(300);
|
1115 |
2
| lock.acquire();
|
1116 |
2
| assertEquals(40, cache2.get("/a/b/c", "age"));
|
1117 |
2
| tx.commit();
|
1118 |
2
| lock.release();
|
1119 |
| |
1120 |
2
| TestingUtil.sleepThread(1000);
|
1121 |
2
| tx = beginTransaction();
|
1122 |
2
| assertEquals("After cache2 commit", 40, cache2.get("/a/b/c", "age"));
|
1123 |
2
| tx.commit();
|
1124 |
| } |
1125 |
| catch (Throwable ex) |
1126 |
| { |
1127 |
0
| ex.printStackTrace();
|
1128 |
0
| t2_ex = ex;
|
1129 |
| } |
1130 |
| finally |
1131 |
| { |
1132 |
2
| lock.release();
|
1133 |
| } |
1134 |
| } |
1135 |
| }; |
1136 |
| |
1137 |
| |
1138 |
2
| t1.start();
|
1139 |
2
| t2.start();
|
1140 |
| |
1141 |
2
| t1.join();
|
1142 |
2
| t2.join();
|
1143 |
| |
1144 |
2
| if (t1_ex != null)
|
1145 |
| { |
1146 |
0
| fail("Thread1 failed: " + t1_ex);
|
1147 |
| } |
1148 |
2
| if (t2_ex != null)
|
1149 |
| { |
1150 |
0
| fail("Thread2 failed: " + t2_ex);
|
1151 |
| } |
1152 |
| } |
1153 |
| |
1154 |
| |
1155 |
2
| public void testPutTxWithRollback() throws Exception
|
1156 |
| { |
1157 |
2
| initCaches(Configuration.CacheMode.REPL_SYNC);
|
1158 |
2
| final CacheImpl c2 = this.cache1;
|
1159 |
2
| Thread t1 = new Thread()
|
1160 |
| { |
1161 |
2
| public void run()
|
1162 |
| { |
1163 |
2
| Transaction tx = null;
|
1164 |
| |
1165 |
2
| try
|
1166 |
| { |
1167 |
2
| lock.acquire();
|
1168 |
2
| tx = beginTransaction();
|
1169 |
2
| c2.put("/a/b/c", "age", 38);
|
1170 |
2
| c2.put("/a/b/c", "age", 39);
|
1171 |
2
| lock.release();
|
1172 |
| |
1173 |
2
| TestingUtil.sleepThread(100);
|
1174 |
2
| lock.acquire();
|
1175 |
2
| tx.rollback();
|
1176 |
2
| lock.release();
|
1177 |
| } |
1178 |
| catch (Throwable ex) |
1179 |
| { |
1180 |
0
| ex.printStackTrace();
|
1181 |
0
| t1_ex = ex;
|
1182 |
| } |
1183 |
| finally |
1184 |
| { |
1185 |
2
| lock.release();
|
1186 |
| } |
1187 |
| } |
1188 |
| }; |
1189 |
| |
1190 |
2
| Thread t2 = new Thread()
|
1191 |
| { |
1192 |
2
| public void run()
|
1193 |
| { |
1194 |
2
| Transaction tx = null;
|
1195 |
| |
1196 |
2
| try
|
1197 |
| { |
1198 |
2
| sleep(200);
|
1199 |
2
| Thread.yield();
|
1200 |
2
| lock.acquire();
|
1201 |
2
| tx = beginTransaction();
|
1202 |
2
| assertNull(cache2.get("/a/b/c", "age"));
|
1203 |
2
| lock.release();
|
1204 |
| |
1205 |
2
| TestingUtil.sleepThread(100);
|
1206 |
2
| lock.acquire();
|
1207 |
2
| assertNull(cache2.get("/a/b/c", "age"));
|
1208 |
2
| tx.commit();
|
1209 |
2
| lock.release();
|
1210 |
| } |
1211 |
| catch (Throwable ex) |
1212 |
| { |
1213 |
0
| ex.printStackTrace();
|
1214 |
0
| t2_ex = ex;
|
1215 |
| } |
1216 |
| finally |
1217 |
| { |
1218 |
2
| lock.release();
|
1219 |
| } |
1220 |
| } |
1221 |
| }; |
1222 |
| |
1223 |
| |
1224 |
2
| t1.start();
|
1225 |
2
| t2.start();
|
1226 |
| |
1227 |
| |
1228 |
2
| t1.join();
|
1229 |
2
| t2.join();
|
1230 |
2
| if (t1_ex != null)
|
1231 |
| { |
1232 |
0
| fail("Thread1 failed: " + t1_ex);
|
1233 |
| } |
1234 |
2
| if (t2_ex != null)
|
1235 |
| { |
1236 |
0
| fail("Thread2 failed: " + t2_ex);
|
1237 |
| } |
1238 |
| } |
1239 |
| |
1240 |
| |
1241 |
| static class TransactionAborter implements Synchronization |
1242 |
| { |
1243 |
| Transaction ltx = null; |
1244 |
| |
1245 |
2
| public TransactionAborter(Transaction ltx)
|
1246 |
| { |
1247 |
2
| this.ltx = ltx;
|
1248 |
| } |
1249 |
| |
1250 |
2
| public void beforeCompletion()
|
1251 |
| { |
1252 |
2
| try
|
1253 |
| { |
1254 |
2
| ltx.setRollbackOnly();
|
1255 |
| } |
1256 |
| catch (SystemException e) |
1257 |
| { |
1258 |
| |
1259 |
| } |
1260 |
| } |
1261 |
| |
1262 |
2
| public void afterCompletion(int status)
|
1263 |
| { |
1264 |
| } |
1265 |
| } |
1266 |
| |
1267 |
| @CacheListener |
1268 |
| static class CallbackListener |
1269 |
| { |
1270 |
| |
1271 |
| CacheImpl callbackCache; |
1272 |
| Object callbackKey; |
1273 |
| Exception ex; |
1274 |
| Object mutex = new Object(); |
1275 |
| |
1276 |
0
| CallbackListener(CacheImpl cache, Object callbackKey)
|
1277 |
| { |
1278 |
0
| this.callbackCache = cache;
|
1279 |
0
| this.callbackKey = callbackKey;
|
1280 |
0
| cache.getNotifier().addCacheListener(this);
|
1281 |
| } |
1282 |
| |
1283 |
0
| @NodeModified
|
1284 |
| public void nodeModified(NodeEvent e) |
1285 |
| { |
1286 |
0
| if (!e.isPre())
|
1287 |
| { |
1288 |
| |
1289 |
| |
1290 |
0
| synchronized (mutex)
|
1291 |
| { |
1292 |
0
| try
|
1293 |
| { |
1294 |
0
| callbackCache.get(e.getFqn(), callbackKey);
|
1295 |
| } |
1296 |
| catch (CacheException exc) |
1297 |
| { |
1298 |
0
| exc.printStackTrace();
|
1299 |
0
| ex = exc;
|
1300 |
| } |
1301 |
| } |
1302 |
| } |
1303 |
| } |
1304 |
| |
1305 |
0
| Exception getCallbackException()
|
1306 |
| { |
1307 |
0
| synchronized (mutex)
|
1308 |
| { |
1309 |
0
| return ex;
|
1310 |
| } |
1311 |
| } |
1312 |
| |
1313 |
| } |
1314 |
| |
1315 |
| static class TransactionAborterCallbackListener extends CallbackListener |
1316 |
| { |
1317 |
| |
1318 |
| TransactionManager callbackTM; |
1319 |
| |
1320 |
0
| TransactionAborterCallbackListener(CacheImpl cache, Object callbackKey)
|
1321 |
| { |
1322 |
0
| super(cache, callbackKey);
|
1323 |
0
| callbackTM = callbackCache.getTransactionManager();
|
1324 |
| } |
1325 |
| |
1326 |
0
| @NodeModified
|
1327 |
| public void nodeModified(NodeEvent ne) |
1328 |
| { |
1329 |
0
| if (!ne.isPre())
|
1330 |
| { |
1331 |
0
| try
|
1332 |
| { |
1333 |
0
| Transaction tx = callbackTM.getTransaction();
|
1334 |
0
| if (tx != null && tx.getStatus() == Status.STATUS_ACTIVE)
|
1335 |
| { |
1336 |
| |
1337 |
0
| tx.registerSynchronization(new TransactionAborter(tx));
|
1338 |
| } |
1339 |
| else |
1340 |
| { |
1341 |
0
| super.nodeModified(ne);
|
1342 |
| } |
1343 |
| |
1344 |
| } |
1345 |
| catch (Exception e) |
1346 |
| { |
1347 |
0
| e.printStackTrace();
|
1348 |
0
| if (ex == null)
|
1349 |
| { |
1350 |
0
| ex = e;
|
1351 |
| } |
1352 |
| } |
1353 |
| } |
1354 |
| } |
1355 |
| |
1356 |
| } |
1357 |
| |
1358 |
| @CacheListener |
1359 |
| static class TransactionAborterListener |
1360 |
| { |
1361 |
| |
1362 |
| TransactionManager callbackTM; |
1363 |
| Object mutex = new Object(); |
1364 |
| Exception ex; |
1365 |
| |
1366 |
0
| TransactionAborterListener(CacheImpl cache)
|
1367 |
| { |
1368 |
0
| callbackTM = cache.getTransactionManager();
|
1369 |
0
| cache.getNotifier().addCacheListener(this);
|
1370 |
| } |
1371 |
| |
1372 |
0
| @NodeModified
|
1373 |
| public void nodeModified(NodeEvent ne) |
1374 |
| { |
1375 |
0
| if (!ne.isPre())
|
1376 |
| { |
1377 |
0
| synchronized (mutex)
|
1378 |
| { |
1379 |
0
| try
|
1380 |
| { |
1381 |
0
| Transaction tx = callbackTM.getTransaction();
|
1382 |
0
| if (tx != null && tx.getStatus() == Status.STATUS_ACTIVE)
|
1383 |
| { |
1384 |
| |
1385 |
0
| tx.setRollbackOnly();
|
1386 |
| } |
1387 |
| } |
1388 |
| catch (Exception e) |
1389 |
| { |
1390 |
0
| e.printStackTrace();
|
1391 |
0
| if (ex == null)
|
1392 |
| { |
1393 |
0
| ex = e;
|
1394 |
| } |
1395 |
| } |
1396 |
| } |
1397 |
| } |
1398 |
| } |
1399 |
| |
1400 |
0
| Exception getCallbackException()
|
1401 |
| { |
1402 |
0
| synchronized (mutex)
|
1403 |
| { |
1404 |
0
| return ex;
|
1405 |
| } |
1406 |
| } |
1407 |
| |
1408 |
| } |
1409 |
| |
1410 |
2
| public static Test suite()
|
1411 |
| { |
1412 |
| |
1413 |
2
| return new TestSuite(SyncReplTxTest.class);
|
1414 |
| } |
1415 |
| |
1416 |
| |
1417 |
| } |