1 |
| |
2 |
| |
3 |
| |
4 |
| |
5 |
| |
6 |
| |
7 |
| package org.jboss.cache.interceptors; |
8 |
| |
9 |
| import org.jboss.cache.CacheException; |
10 |
| import org.jboss.cache.CacheSPI; |
11 |
| import org.jboss.cache.InvocationContext; |
12 |
| import org.jboss.cache.ReplicationException; |
13 |
| import org.jboss.cache.config.Configuration; |
14 |
| import org.jboss.cache.config.Option; |
15 |
| import org.jboss.cache.marshall.MethodCall; |
16 |
| import org.jboss.cache.marshall.MethodCallFactory; |
17 |
| import org.jboss.cache.marshall.MethodDeclarations; |
18 |
| import org.jboss.cache.optimistic.DataVersion; |
19 |
| import org.jboss.cache.transaction.GlobalTransaction; |
20 |
| import org.jboss.cache.transaction.OptimisticTransactionEntry; |
21 |
| import org.jboss.cache.transaction.TransactionEntry; |
22 |
| |
23 |
| import javax.transaction.Status; |
24 |
| import javax.transaction.Synchronization; |
25 |
| import javax.transaction.SystemException; |
26 |
| import javax.transaction.Transaction; |
27 |
| import java.util.HashMap; |
28 |
| import java.util.List; |
29 |
| import java.util.Map; |
30 |
| import java.util.concurrent.ConcurrentHashMap; |
31 |
| |
32 |
| |
33 |
| |
34 |
| |
35 |
| |
36 |
| |
37 |
| |
38 |
| |
39 |
| |
40 |
| |
41 |
| public class TxInterceptor extends BaseTransactionalContextInterceptor implements TxInterceptorMBean |
42 |
| { |
43 |
| private final static Object NULL = new Object(); |
44 |
| |
45 |
| |
46 |
| |
47 |
| |
48 |
| private Map transactions = new ConcurrentHashMap(16); |
49 |
| private Map rollbackTransactions = new ConcurrentHashMap(16); |
50 |
| private long m_prepares = 0; |
51 |
| private long m_commits = 0; |
52 |
| private long m_rollbacks = 0; |
53 |
| |
54 |
| |
55 |
| |
56 |
| |
57 |
| |
58 |
| |
59 |
| |
60 |
| private Map remoteTransactions = new ConcurrentHashMap(); |
61 |
| |
62 |
2485388
| public Object invoke(InvocationContext ctx) throws Throwable
|
63 |
| { |
64 |
2485388
| MethodCall m = ctx.getMethodCall();
|
65 |
2485388
| if (log.isTraceEnabled())
|
66 |
| { |
67 |
0
| log.trace("(" + cache.getLocalAddress() + ") call on method [" + m + "]");
|
68 |
| } |
69 |
| |
70 |
789
| if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return super.invoke(ctx);
|
71 |
| |
72 |
2484599
| boolean scrubTxsOnExit = false;
|
73 |
2484599
| Option optionOverride = ctx.getOptionOverrides();
|
74 |
| |
75 |
2484599
| Object result = null;
|
76 |
| |
77 |
2484599
| try
|
78 |
| { |
79 |
| |
80 |
| |
81 |
| |
82 |
| |
83 |
2484599
| if (MethodDeclarations.isTransactionLifecycleMethod(m.getMethodId()))
|
84 |
| { |
85 |
| |
86 |
0
| if (log.isDebugEnabled()) log.debug("Got gtx from invocation context " + ctx.getGlobalTransaction());
|
87 |
| |
88 |
1473
| if (ctx.getGlobalTransaction().isRemote()) remoteTransactions.put(ctx.getGlobalTransaction(), NULL);
|
89 |
| |
90 |
1473
| switch (m.getMethodId())
|
91 |
| { |
92 |
179
| case MethodDeclarations.optimisticPrepareMethod_id:
|
93 |
570
| case MethodDeclarations.prepareMethod_id:
|
94 |
749
| if (ctx.getGlobalTransaction().isRemote())
|
95 |
| { |
96 |
749
| result = handleRemotePrepare(ctx, m);
|
97 |
726
| scrubTxsOnExit = true;
|
98 |
726
| if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
|
99 |
| { |
100 |
715
| m_prepares++;
|
101 |
| } |
102 |
| } |
103 |
| else |
104 |
| { |
105 |
0
| if (log.isTraceEnabled()) log.trace("received my own message (discarding it)");
|
106 |
0
| result = null;
|
107 |
| } |
108 |
726
| break;
|
109 |
696
| case MethodDeclarations.commitMethod_id:
|
110 |
28
| case MethodDeclarations.rollbackMethod_id:
|
111 |
724
| if (ctx.getGlobalTransaction().isRemote())
|
112 |
| { |
113 |
724
| result = handleRemoteCommitRollback(ctx);
|
114 |
721
| scrubTxsOnExit = true;
|
115 |
| } |
116 |
| else |
117 |
| { |
118 |
0
| if (log.isTraceEnabled()) log.trace("received my own message (discarding it)");
|
119 |
0
| result = null;
|
120 |
| } |
121 |
721
| break;
|
122 |
| } |
123 |
| } |
124 |
| else |
125 |
| { |
126 |
| |
127 |
2483126
| result = handleNonTxMethod(ctx);
|
128 |
| } |
129 |
| } |
130 |
| catch (Exception e) |
131 |
| { |
132 |
70
| if (optionOverride == null || !optionOverride.isFailSilently()) throw e;
|
133 |
11
| log.trace("There was a problem handling this request, but " +
|
134 |
| "failSilently was set, so suppressing exception", e); |
135 |
| } |
136 |
| finally |
137 |
| { |
138 |
| |
139 |
| |
140 |
| |
141 |
| |
142 |
2484598
| if (scrubTxsOnExit)
|
143 |
| { |
144 |
1447
| setTransactionalContext(null, null, ctx);
|
145 |
| } |
146 |
| } |
147 |
2484528
| return result;
|
148 |
| } |
149 |
| |
150 |
6
| public long getPrepares()
|
151 |
| { |
152 |
6
| return m_prepares;
|
153 |
| } |
154 |
| |
155 |
6
| public long getCommits()
|
156 |
| { |
157 |
6
| return m_commits;
|
158 |
| } |
159 |
| |
160 |
6
| public long getRollbacks()
|
161 |
| { |
162 |
6
| return m_rollbacks;
|
163 |
| } |
164 |
| |
165 |
2
| public void resetStatistics()
|
166 |
| { |
167 |
2
| m_prepares = 0;
|
168 |
2
| m_commits = 0;
|
169 |
2
| m_rollbacks = 0;
|
170 |
| } |
171 |
| |
172 |
0
| public Map<String, Object> dumpStatistics()
|
173 |
| { |
174 |
0
| Map<String, Object> retval = new HashMap<String, Object>(3);
|
175 |
0
| retval.put("Prepares", m_prepares);
|
176 |
0
| retval.put("Commits", m_commits);
|
177 |
0
| retval.put("Rollbacks", m_rollbacks);
|
178 |
0
| return retval;
|
179 |
| } |
180 |
| |
181 |
749
| private Object handleRemotePrepare(InvocationContext ctx, MethodCall m) throws Throwable
|
182 |
| { |
183 |
749
| GlobalTransaction gtx = ctx.getGlobalTransaction();
|
184 |
749
| List<MethodCall> modifications = (List<MethodCall>) m.getArgs()[1];
|
185 |
749
| boolean onePhase = (Boolean) m.getArgs()[configuration.isNodeLockingOptimistic() ? 4 : 3];
|
186 |
| |
187 |
| |
188 |
749
| Transaction ltx = txTable.getLocalTransaction(gtx);
|
189 |
| |
190 |
749
| Transaction currentTx = txManager.getTransaction();
|
191 |
749
| Object retval = null;
|
192 |
| |
193 |
749
| try
|
194 |
| { |
195 |
749
| if (ltx == null)
|
196 |
| { |
197 |
3
| if (currentTx != null) txManager.suspend();
|
198 |
749
| ltx = createLocalTxForGlobalTx(gtx, ctx);
|
199 |
749
| if (log.isDebugEnabled())
|
200 |
| { |
201 |
0
| log.debug("Started new local TX as result of remote PREPARE: local TX=" + ltx + " (Status=" + ltx.getStatus() + "), global TX=" + gtx);
|
202 |
| } |
203 |
| } |
204 |
| else |
205 |
| { |
206 |
| |
207 |
0
| if (!isValid(ltx)) throw new CacheException("Transaction " + ltx + " not in correct state to be prepared");
|
208 |
| |
209 |
| |
210 |
0
| if (currentTx == null || !ltx.equals(currentTx))
|
211 |
| { |
212 |
0
| txManager.suspend();
|
213 |
0
| txManager.resume(ltx);
|
214 |
| } |
215 |
| } |
216 |
| |
217 |
| |
218 |
749
| if (log.isTraceEnabled())
|
219 |
| { |
220 |
0
| log.trace("Resuming existing transaction " + ltx + ", global TX=" + gtx);
|
221 |
| } |
222 |
| |
223 |
| |
224 |
| |
225 |
| |
226 |
| |
227 |
| |
228 |
| |
229 |
749
| if (txTable.get(gtx) == null)
|
230 |
| { |
231 |
| |
232 |
| |
233 |
749
| TransactionEntry entry = configuration.isNodeLockingOptimistic() ? new OptimisticTransactionEntry() : new TransactionEntry();
|
234 |
749
| entry.setTransaction(ltx);
|
235 |
749
| log.debug("creating new tx entry");
|
236 |
749
| txTable.put(gtx, entry);
|
237 |
0
| if (log.isTraceEnabled()) log.trace("TxTable contents: " + txTable);
|
238 |
| } |
239 |
| |
240 |
749
| setTransactionalContext(ltx, gtx, ctx);
|
241 |
| |
242 |
749
| registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx, cache), ctx);
|
243 |
| |
244 |
749
| if (configuration.isNodeLockingOptimistic())
|
245 |
| { |
246 |
179
| retval = handleOptimisticPrepare(ctx, gtx, modifications, onePhase, ltx);
|
247 |
| } |
248 |
| else |
249 |
| { |
250 |
570
| retval = handlePessimisticPrepare(ctx, m, gtx, modifications, onePhase, ltx);
|
251 |
| } |
252 |
| } |
253 |
| finally |
254 |
| { |
255 |
749
| txManager.suspend();
|
256 |
| |
257 |
3
| if (currentTx != null) txManager.resume(currentTx);
|
258 |
0
| if (log.isDebugEnabled()) log.debug("Finished remote prepare " + gtx);
|
259 |
| } |
260 |
| |
261 |
726
| return retval;
|
262 |
| } |
263 |
| |
264 |
| |
265 |
| |
266 |
| |
267 |
| |
268 |
| |
269 |
| |
270 |
| |
271 |
| |
272 |
| |
273 |
| |
274 |
| |
275 |
2483126
| private Object handleNonTxMethod(InvocationContext ctx) throws Throwable
|
276 |
| { |
277 |
2483126
| MethodCall m = ctx.getMethodCall();
|
278 |
2483126
| Transaction tx = ctx.getTransaction();
|
279 |
2483126
| Object result;
|
280 |
| |
281 |
2483126
| boolean implicitTransaction = configuration.isNodeLockingOptimistic() && tx == null;
|
282 |
2483126
| if (implicitTransaction)
|
283 |
| { |
284 |
3297
| tx = createLocalTx();
|
285 |
| |
286 |
3297
| ctx.setTransaction(tx);
|
287 |
| } |
288 |
1401906
| if (tx != null) m = attachGlobalTransaction(ctx, tx, m);
|
289 |
| |
290 |
2483126
| GlobalTransaction gtx = ctx.getGlobalTransaction();
|
291 |
| |
292 |
2483126
| try
|
293 |
| { |
294 |
2483126
| result = super.invoke(ctx);
|
295 |
2483070
| if (implicitTransaction)
|
296 |
| { |
297 |
3297
| copyInvocationScopeOptionsToTxScope(ctx);
|
298 |
3297
| txManager.commit();
|
299 |
| } |
300 |
| } |
301 |
| catch (Throwable t) |
302 |
| { |
303 |
65
| if (implicitTransaction)
|
304 |
| { |
305 |
10
| log.warn("Rolling back, exception encountered", t);
|
306 |
10
| result = t;
|
307 |
10
| try
|
308 |
| { |
309 |
10
| setTransactionalContext(tx, gtx, ctx);
|
310 |
10
| txManager.rollback();
|
311 |
| } |
312 |
| catch (Throwable th) |
313 |
| { |
314 |
10
| log.warn("Roll back failed encountered", th);
|
315 |
| } |
316 |
| } |
317 |
| else |
318 |
| { |
319 |
55
| throw t;
|
320 |
| } |
321 |
| } |
322 |
2483070
| return result;
|
323 |
| } |
324 |
| |
325 |
1401906
| private MethodCall attachGlobalTransaction(InvocationContext ctx, Transaction tx, MethodCall m) throws Exception
|
326 |
| { |
327 |
1401906
| if (log.isDebugEnabled())
|
328 |
| { |
329 |
0
| log.debug(" local transaction exists - registering global tx if not present for " + Thread.currentThread());
|
330 |
| } |
331 |
1401905
| if (log.isTraceEnabled())
|
332 |
| { |
333 |
0
| GlobalTransaction tempGtx = txTable.get(tx);
|
334 |
0
| log.trace("Associated gtx in txTable is " + tempGtx);
|
335 |
| } |
336 |
| |
337 |
| |
338 |
1401906
| GlobalTransaction gtx = registerTransaction(tx, ctx);
|
339 |
1401906
| if (gtx != null)
|
340 |
| { |
341 |
1121124
| m = replaceGtx(m, gtx);
|
342 |
| } |
343 |
| else |
344 |
| { |
345 |
| |
346 |
280782
| gtx = txTable.get(tx);
|
347 |
| } |
348 |
| |
349 |
| |
350 |
1401906
| ctx.setGlobalTransaction(gtx);
|
351 |
| |
352 |
1401906
| return m;
|
353 |
| } |
354 |
| |
355 |
| |
356 |
| |
357 |
| |
358 |
| |
359 |
| |
360 |
| |
361 |
| |
362 |
| |
363 |
| |
364 |
| |
365 |
179
| private Object handleOptimisticPrepare(InvocationContext ctx, GlobalTransaction gtx, List<MethodCall> modifications, boolean onePhase, Transaction ltx) throws Throwable
|
366 |
| { |
367 |
179
| Object retval;
|
368 |
0
| if (log.isDebugEnabled()) log.debug("Handling optimistic remote prepare " + gtx);
|
369 |
179
| replayModifications(modifications, ctx, true);
|
370 |
179
| retval = super.invoke(ctx);
|
371 |
| |
372 |
178
| if (!isActive(ltx))
|
373 |
| { |
374 |
0
| throw new ReplicationException("prepare() failed -- " +
|
375 |
| "local transaction status is not STATUS_ACTIVE;" + |
376 |
| " is " + ltx.getStatus()); |
377 |
| } |
378 |
178
| return retval;
|
379 |
| } |
380 |
| |
381 |
570
| private Object handlePessimisticPrepare(InvocationContext ctx, MethodCall m, GlobalTransaction gtx, List<MethodCall> modifications, boolean commit, Transaction ltx) throws Exception
|
382 |
| { |
383 |
570
| boolean success = true;
|
384 |
570
| Object retval;
|
385 |
570
| try
|
386 |
| { |
387 |
| |
388 |
570
| try
|
389 |
| { |
390 |
570
| replayModifications(modifications, ctx, false);
|
391 |
548
| if (isOnePhaseCommitPrepareMehod(m))
|
392 |
| { |
393 |
19
| log.trace("Using one-phase prepare. Not propagating the prepare call up the stack until called to do so by the sync handler.");
|
394 |
| } |
395 |
| else |
396 |
| { |
397 |
529
| super.invoke(ctx);
|
398 |
| } |
399 |
| |
400 |
| |
401 |
548
| if (!isActive(ltx))
|
402 |
| { |
403 |
0
| throw new ReplicationException("prepare() failed -- " +
|
404 |
| "local transaction status is not STATUS_ACTIVE;" + |
405 |
| " is " + ltx.getStatus()); |
406 |
| } |
407 |
| } |
408 |
| catch (Throwable th) |
409 |
| { |
410 |
22
| log.error("prepare method invocation failed", th);
|
411 |
22
| retval = th;
|
412 |
22
| success = false;
|
413 |
22
| if (retval instanceof Exception)
|
414 |
| { |
415 |
22
| throw (Exception) retval;
|
416 |
| } |
417 |
| } |
418 |
| } |
419 |
| finally |
420 |
| { |
421 |
| |
422 |
570
| if (log.isTraceEnabled())
|
423 |
| { |
424 |
0
| log.trace("Are we running a 1-phase commit? " + commit);
|
425 |
| } |
426 |
| |
427 |
| |
428 |
| |
429 |
| |
430 |
570
| if (commit)
|
431 |
| { |
432 |
19
| try
|
433 |
| { |
434 |
| |
435 |
19
| if (success)
|
436 |
| { |
437 |
19
| ltx.commit();
|
438 |
| } |
439 |
| else |
440 |
| { |
441 |
0
| ltx.rollback();
|
442 |
| } |
443 |
| } |
444 |
| catch (Throwable t) |
445 |
| { |
446 |
0
| log.error("Commit/rollback failed.", t);
|
447 |
0
| if (success)
|
448 |
| { |
449 |
| |
450 |
0
| try
|
451 |
| { |
452 |
0
| log.info("Attempting anotehr rollback");
|
453 |
| |
454 |
0
| ltx.rollback();
|
455 |
| } |
456 |
| catch (Throwable t2) |
457 |
| { |
458 |
0
| log.error("Unable to rollback", t2);
|
459 |
| } |
460 |
| } |
461 |
| } |
462 |
| finally |
463 |
| { |
464 |
19
| transactions.remove(ltx);
|
465 |
19
| remoteTransactions.remove(gtx);
|
466 |
| } |
467 |
| } |
468 |
| } |
469 |
548
| return null;
|
470 |
| } |
471 |
| |
472 |
749
| private Object replayModifications(List<MethodCall> modifications, InvocationContext ctx, boolean injectDataVersions)
|
473 |
| { |
474 |
749
| Object retval = null;
|
475 |
749
| MethodCall originalMethodCall = ctx.getMethodCall();
|
476 |
749
| Option originalOption = ctx.getOptionOverrides();
|
477 |
| |
478 |
749
| if (modifications != null)
|
479 |
| { |
480 |
749
| for (MethodCall modification : modifications)
|
481 |
| { |
482 |
4107
| try
|
483 |
| { |
484 |
4107
| if (injectDataVersions && !MethodDeclarations.isDataGravitationMethod(modification.getMethodId()))
|
485 |
| { |
486 |
197
| Object[] origArgs = modification.getArgs();
|
487 |
| |
488 |
| |
489 |
197
| Option o = null;
|
490 |
197
| if (origArgs[origArgs.length - 1] instanceof DataVersion)
|
491 |
| { |
492 |
196
| o = new Option();
|
493 |
196
| o.setDataVersion((DataVersion) origArgs[origArgs.length - 1]);
|
494 |
| } |
495 |
| |
496 |
197
| Object[] args = new Object[origArgs.length - 1];
|
497 |
197
| System.arraycopy(origArgs, 0, args, 0, args.length);
|
498 |
| |
499 |
197
| ctx.setMethodCall(MethodCallFactory.create(MethodDeclarations.getUnversionedMethod(modification.getMethodId()), args));
|
500 |
196
| if (o != null) ctx.setOptionOverrides(o);
|
501 |
| } |
502 |
| else |
503 |
| { |
504 |
3910
| ctx.setMethodCall(modification);
|
505 |
| } |
506 |
| |
507 |
4107
| retval = super.invoke(ctx);
|
508 |
| |
509 |
4085
| if (!isActive(ctx.getTransaction()))
|
510 |
| { |
511 |
0
| throw new ReplicationException("prepare() failed -- " + "local transaction status is not STATUS_ACTIVE; is " + ctx.getTransaction().getStatus());
|
512 |
| } |
513 |
| } |
514 |
| catch (Throwable t) |
515 |
| { |
516 |
22
| log.error("method invocation failed", t);
|
517 |
22
| retval = t;
|
518 |
| } |
519 |
| finally |
520 |
| { |
521 |
| |
522 |
204
| if (injectDataVersions) ctx.setOptionOverrides(originalOption);
|
523 |
4107
| ctx.setMethodCall(originalMethodCall);
|
524 |
| } |
525 |
4107
| if (retval != null && retval instanceof Exception)
|
526 |
| { |
527 |
22
| if (retval instanceof RuntimeException)
|
528 |
22
| throw (RuntimeException) retval;
|
529 |
| else |
530 |
0
| throw new RuntimeException((Exception) retval);
|
531 |
| } |
532 |
| } |
533 |
| } |
534 |
| |
535 |
727
| return retval;
|
536 |
| } |
537 |
| |
538 |
| |
539 |
| |
540 |
| |
541 |
| |
542 |
| |
543 |
| |
544 |
724
| private Object handleRemoteCommitRollback(InvocationContext ctx) throws Throwable
|
545 |
| { |
546 |
724
| Transaction ltx;
|
547 |
724
| GlobalTransaction gtx = ctx.getGlobalTransaction();
|
548 |
724
| MethodCall m = ctx.getMethodCall();
|
549 |
| |
550 |
724
| try
|
551 |
| { |
552 |
724
| ltx = getLocalTxForGlobalTx(gtx);
|
553 |
| } |
554 |
| catch (IllegalStateException e) |
555 |
| { |
556 |
4
| if (m.getMethodId() == MethodDeclarations.rollbackMethod_id)
|
557 |
| { |
558 |
2
| log.warn("No local transaction for this remotely originating rollback. Possibly rolling back before a prepare call was broadcast?");
|
559 |
2
| return null;
|
560 |
| } |
561 |
| else |
562 |
| { |
563 |
2
| throw e;
|
564 |
| } |
565 |
| } |
566 |
| |
567 |
| |
568 |
720
| Transaction currentTx = txManager.getTransaction();
|
569 |
720
| boolean resumeCurrentTxOnCompletion = false;
|
570 |
720
| try
|
571 |
| { |
572 |
720
| if (!ltx.equals(currentTx))
|
573 |
| { |
574 |
720
| currentTx = txManager.suspend();
|
575 |
720
| resumeCurrentTxOnCompletion = true;
|
576 |
720
| txManager.resume(ltx);
|
577 |
| |
578 |
720
| ctx.setTransaction(ltx);
|
579 |
| } |
580 |
0
| if (log.isDebugEnabled()) log.debug(" executing " + m + "() with local TX " + ltx + " under global tx " + gtx);
|
581 |
| |
582 |
| |
583 |
| |
584 |
| |
585 |
720
| if (m.getMethodId() == MethodDeclarations.commitMethod_id)
|
586 |
| { |
587 |
694
| txManager.commit();
|
588 |
693
| if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
|
589 |
| { |
590 |
689
| m_commits++;
|
591 |
| } |
592 |
| } |
593 |
| else |
594 |
| { |
595 |
26
| txManager.rollback();
|
596 |
26
| if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
|
597 |
| { |
598 |
22
| m_rollbacks++;
|
599 |
| } |
600 |
| } |
601 |
| } |
602 |
| finally |
603 |
| { |
604 |
| |
605 |
720
| if (resumeCurrentTxOnCompletion)
|
606 |
| { |
607 |
0
| if (log.isTraceEnabled()) log.trace("Resuming suspended transaction " + currentTx);
|
608 |
720
| txManager.suspend();
|
609 |
720
| if (currentTx != null)
|
610 |
| { |
611 |
2
| txManager.resume(currentTx);
|
612 |
2
| ctx.setTransaction(currentTx);
|
613 |
| } |
614 |
| } |
615 |
| |
616 |
| |
617 |
720
| remoteTransactions.remove(gtx);
|
618 |
720
| transactions.remove(ltx);
|
619 |
| |
620 |
| |
621 |
720
| txTable.remove(gtx);
|
622 |
720
| txTable.remove(ltx);
|
623 |
| } |
624 |
| |
625 |
0
| if (log.isDebugEnabled()) log.debug("Finished remote commit/rollback method for " + gtx);
|
626 |
| |
627 |
719
| return null;
|
628 |
| } |
629 |
| |
630 |
724
| private Transaction getLocalTxForGlobalTx(GlobalTransaction gtx) throws IllegalStateException
|
631 |
| { |
632 |
724
| Transaction ltx = txTable.getLocalTransaction(gtx);
|
633 |
724
| if (ltx != null)
|
634 |
| { |
635 |
0
| if (log.isDebugEnabled()) log.debug("Found local TX=" + ltx + ", global TX=" + gtx);
|
636 |
| } |
637 |
| else |
638 |
| { |
639 |
4
| throw new IllegalStateException(" found no local TX for global TX " + gtx);
|
640 |
| } |
641 |
720
| return ltx;
|
642 |
| } |
643 |
| |
644 |
| |
645 |
| |
646 |
| |
647 |
| |
648 |
| |
649 |
| |
650 |
| |
651 |
1121770
| private Object handleCommitRollback(InvocationContext ctx) throws Throwable
|
652 |
| { |
653 |
| |
654 |
1121770
| GlobalTransaction gtx = ctx.getGlobalTransaction();
|
655 |
1121770
| Object result;
|
656 |
| |
657 |
| |
658 |
| |
659 |
| |
660 |
| |
661 |
| |
662 |
| |
663 |
| |
664 |
| |
665 |
1121770
| result = super.invoke(ctx);
|
666 |
| |
667 |
0
| if (log.isDebugEnabled()) log.debug("Finished local commit/rollback method for " + gtx);
|
668 |
1121759
| return result;
|
669 |
| } |
670 |
| |
671 |
| |
672 |
| |
673 |
| |
674 |
| |
675 |
| |
676 |
| |
677 |
| |
678 |
| |
679 |
| |
680 |
1121442
| protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, Transaction tx, List modifications, boolean onePhaseCommit)
|
681 |
| { |
682 |
| |
683 |
1121440
| ctx.setTxHasMods(modifications != null && modifications.size() > 0);
|
684 |
1121442
| try
|
685 |
| { |
686 |
1121442
| MethodCall commitMethod;
|
687 |
1121442
| if (onePhaseCommit)
|
688 |
| { |
689 |
| |
690 |
59
| if (configuration.isNodeLockingOptimistic())
|
691 |
| { |
692 |
0
| commitMethod = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod,
|
693 |
| gtx, modifications, null, cache.getLocalAddress(), true); |
694 |
| } |
695 |
| else |
696 |
| { |
697 |
59
| commitMethod = MethodCallFactory.create(MethodDeclarations.prepareMethod,
|
698 |
| gtx, modifications, cache.getLocalAddress(), |
699 |
| true); |
700 |
| } |
701 |
| } |
702 |
| else |
703 |
| { |
704 |
1121383
| commitMethod = MethodCallFactory.create(MethodDeclarations.commitMethod, gtx);
|
705 |
| } |
706 |
| |
707 |
1121442
| if (log.isTraceEnabled())
|
708 |
| { |
709 |
0
| log.trace(" running commit for " + gtx);
|
710 |
| } |
711 |
1121442
| ctx.setMethodCall(commitMethod);
|
712 |
1121442
| handleCommitRollback(ctx);
|
713 |
| } |
714 |
| catch (Throwable e) |
715 |
| { |
716 |
6
| log.warn("Commit failed. Clearing stale locks.");
|
717 |
6
| try
|
718 |
| { |
719 |
6
| cleanupStaleLocks(gtx);
|
720 |
| } |
721 |
| catch (RuntimeException re) |
722 |
| { |
723 |
0
| log.error("Unable to clear stale locks", re);
|
724 |
0
| throw re;
|
725 |
| } |
726 |
| catch (Throwable e2) |
727 |
| { |
728 |
0
| log.error("Unable to clear stale locks", e2);
|
729 |
0
| throw new RuntimeException(e2);
|
730 |
| } |
731 |
6
| if (e instanceof RuntimeException)
|
732 |
4
| throw (RuntimeException) e;
|
733 |
| else |
734 |
2
| throw new RuntimeException("Commit failed.", e);
|
735 |
| } |
736 |
| } |
737 |
| |
738 |
| |
739 |
6
| private void cleanupStaleLocks(GlobalTransaction gtx) throws Throwable
|
740 |
| { |
741 |
6
| TransactionEntry entry = txTable.get(gtx);
|
742 |
6
| if (entry != null)
|
743 |
| { |
744 |
6
| entry.releaseAllLocksLIFO(gtx);
|
745 |
| } |
746 |
| } |
747 |
| |
748 |
| |
749 |
| |
750 |
| |
751 |
| |
752 |
| |
753 |
328
| protected void runRollbackPhase(InvocationContext ctx, GlobalTransaction gtx, Transaction tx, List modifications)
|
754 |
| { |
755 |
| |
756 |
328
| try
|
757 |
| { |
758 |
328
| ctx.setTxHasMods(modifications != null && modifications.size() > 0);
|
759 |
| |
760 |
| |
761 |
328
| MethodCall rollbackMethod = MethodCallFactory.create(MethodDeclarations.rollbackMethod, gtx);
|
762 |
328
| if (log.isTraceEnabled())
|
763 |
| { |
764 |
0
| log.trace(" running rollback for " + gtx);
|
765 |
| } |
766 |
| |
767 |
| |
768 |
| |
769 |
| |
770 |
328
| rollbackTransactions.put(tx, gtx);
|
771 |
| |
772 |
328
| ctx.setMethodCall(rollbackMethod);
|
773 |
328
| handleCommitRollback(ctx);
|
774 |
| } |
775 |
| catch (Throwable e) |
776 |
| { |
777 |
5
| log.warn("Rollback had a problem", e);
|
778 |
| } |
779 |
| finally |
780 |
| { |
781 |
328
| if (tx != null) rollbackTransactions.remove(tx);
|
782 |
| } |
783 |
| } |
784 |
| |
785 |
| |
786 |
| |
787 |
| |
788 |
| |
789 |
| |
790 |
| |
791 |
| |
792 |
67441
| protected Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx, List modifications) throws Throwable
|
793 |
| { |
794 |
| |
795 |
67441
| MethodCall prepareMethod;
|
796 |
| |
797 |
| |
798 |
| |
799 |
67441
| if (configuration.isNodeLockingOptimistic())
|
800 |
| { |
801 |
980
| prepareMethod = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod, gtx, modifications, null, cache.getLocalAddress(), false);
|
802 |
| } |
803 |
66461
| else if (configuration.getCacheMode() != Configuration.CacheMode.REPL_ASYNC)
|
804 |
| { |
805 |
66418
| prepareMethod = MethodCallFactory.create(MethodDeclarations.prepareMethod,
|
806 |
| gtx, modifications, cache.getLocalAddress(), |
807 |
| false); |
808 |
| } |
809 |
| |
810 |
| else |
811 |
| { |
812 |
| |
813 |
43
| log.trace("This is a REPL_ASYNC call (1 phase commit) - do nothing for beforeCompletion()");
|
814 |
43
| return null;
|
815 |
| } |
816 |
| |
817 |
| |
818 |
| |
819 |
| |
820 |
67398
| Object result;
|
821 |
| |
822 |
| |
823 |
67398
| Transaction ltx = ctx.getTransaction();
|
824 |
| |
825 |
| |
826 |
67398
| if (txManager.getTransaction() != null && ltx != null && txManager.getTransaction().equals(ltx))
|
827 |
| { |
828 |
67398
| ctx.setMethodCall(prepareMethod);
|
829 |
67398
| result = super.invoke(ctx);
|
830 |
| } |
831 |
| else |
832 |
| { |
833 |
0
| log.warn("Local transaction does not exist or does not match expected transaction " + gtx);
|
834 |
0
| throw new CacheException(" local transaction " + ltx + " does not exist or does not match expected transaction " + gtx);
|
835 |
| } |
836 |
67336
| return result;
|
837 |
| } |
838 |
| |
839 |
| |
840 |
| |
841 |
| |
842 |
| |
843 |
| |
844 |
| |
845 |
| |
846 |
| |
847 |
| |
848 |
| |
849 |
| |
850 |
| |
851 |
1401906
| private GlobalTransaction registerTransaction(Transaction tx, InvocationContext ctx) throws Exception
|
852 |
| { |
853 |
1401906
| GlobalTransaction gtx;
|
854 |
1401906
| if (isValid(tx) && transactions.put(tx, NULL) == null)
|
855 |
| { |
856 |
1121124
| gtx = cache.getCurrentTransaction(tx, true);
|
857 |
1121124
| if (gtx.isRemote())
|
858 |
| { |
859 |
| |
860 |
46
| if (log.isTraceEnabled())
|
861 |
| { |
862 |
0
| log.trace("is a remotely initiated gtx so no need to register a tx for it");
|
863 |
| } |
864 |
| } |
865 |
| else |
866 |
| { |
867 |
1121078
| if (log.isTraceEnabled())
|
868 |
| { |
869 |
0
| log.trace("Registering sync handler for tx " + tx + ", gtx " + gtx);
|
870 |
| } |
871 |
| |
872 |
1121078
| LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx, tx, cache, !ctx.isOriginLocal());
|
873 |
1121078
| registerHandler(tx, myHandler, ctx);
|
874 |
| } |
875 |
| } |
876 |
?
| else if ((gtx = (GlobalTransaction) rollbackTransactions.get(tx)) != null)
|
877 |
| { |
878 |
0
| if (log.isDebugEnabled()) log.debug("Transaction " + tx + " is already registered and is rolling back.");
|
879 |
| } |
880 |
| else |
881 |
| { |
882 |
0
| if (log.isDebugEnabled()) log.debug("Transaction " + tx + " is already registered.");
|
883 |
| |
884 |
| } |
885 |
1401906
| return gtx;
|
886 |
| } |
887 |
| |
888 |
| |
889 |
| |
890 |
| |
891 |
| |
892 |
| |
893 |
| |
894 |
| |
895 |
1121827
| private void registerHandler(Transaction tx, Synchronization handler, InvocationContext ctx) throws Exception
|
896 |
| { |
897 |
1121825
| OrderedSynchronizationHandler orderedHandler = OrderedSynchronizationHandler.getInstance(tx);
|
898 |
| |
899 |
0
| if (log.isTraceEnabled()) log.trace("registering for TX completion: SynchronizationHandler(" + handler + ")");
|
900 |
| |
901 |
1121827
| orderedHandler.registerAtHead(handler);
|
902 |
| |
903 |
1121827
| cache.getNotifier().notifyTransactionRegistered(tx, ctx);
|
904 |
| } |
905 |
| |
906 |
| |
907 |
| |
908 |
| |
909 |
1121124
| private MethodCall replaceGtx(MethodCall m, GlobalTransaction gtx)
|
910 |
| { |
911 |
1121124
| Class[] argClasses = m.getMethod().getParameterTypes();
|
912 |
1121124
| Object[] args = m.getArgs();
|
913 |
| |
914 |
1121123
| for (int i = 0; i < argClasses.length; i++)
|
915 |
| { |
916 |
3224550
| if (argClasses[i].equals(GlobalTransaction.class))
|
917 |
| { |
918 |
65411
| if (!gtx.equals(args[i]))
|
919 |
| { |
920 |
732
| args[i] = gtx;
|
921 |
732
| m.setArgs(args);
|
922 |
| } |
923 |
65411
| break;
|
924 |
| } |
925 |
| } |
926 |
1121124
| return m;
|
927 |
| } |
928 |
| |
929 |
| |
930 |
| |
931 |
| |
932 |
| |
933 |
| |
934 |
| |
935 |
4046
| private Transaction createLocalTx() throws Exception
|
936 |
| { |
937 |
4046
| if (log.isTraceEnabled())
|
938 |
| { |
939 |
0
| log.trace("Creating transaction for thread " + Thread.currentThread());
|
940 |
| } |
941 |
4046
| Transaction localTx;
|
942 |
0
| if (txManager == null) throw new Exception("Failed to create local transaction; TransactionManager is null");
|
943 |
4046
| txManager.begin();
|
944 |
4046
| localTx = txManager.getTransaction();
|
945 |
4046
| return localTx;
|
946 |
| } |
947 |
| |
948 |
| |
949 |
| |
950 |
| |
951 |
| |
952 |
| |
953 |
| |
954 |
| |
955 |
749
| private Transaction createLocalTxForGlobalTx(GlobalTransaction gtx, InvocationContext ctx) throws Exception
|
956 |
| { |
957 |
749
| Transaction localTx = createLocalTx();
|
958 |
749
| txTable.put(localTx, gtx);
|
959 |
| |
960 |
749
| ctx.setTransaction(localTx);
|
961 |
0
| if (log.isTraceEnabled()) log.trace("Created new tx for gtx " + gtx);
|
962 |
749
| return localTx;
|
963 |
| } |
964 |
| |
965 |
| |
966 |
| |
967 |
| |
968 |
| |
969 |
| |
970 |
| |
971 |
| private class RemoteSynchronizationHandler implements Synchronization |
972 |
| { |
973 |
| Transaction tx = null; |
974 |
| GlobalTransaction gtx = null; |
975 |
| CacheSPI cache = null; |
976 |
| List modifications = null; |
977 |
| TransactionEntry entry = null; |
978 |
| protected InvocationContext ctx; |
979 |
| |
980 |
| |
981 |
1121827
| RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx, CacheSPI cache)
|
982 |
| { |
983 |
1121827
| this.gtx = gtx;
|
984 |
1121827
| this.tx = tx;
|
985 |
1121827
| this.cache = cache;
|
986 |
| } |
987 |
| |
988 |
1121518
| public void beforeCompletion()
|
989 |
| { |
990 |
0
| if (log.isTraceEnabled()) log.trace("Running beforeCompletion on gtx " + gtx);
|
991 |
1121518
| entry = txTable.get(gtx);
|
992 |
1121518
| if (entry == null)
|
993 |
| { |
994 |
0
| log.error("Transaction has a null transaction entry - beforeCompletion() will fail.");
|
995 |
0
| log.error("TxTable contents: " + txTable);
|
996 |
0
| throw new IllegalStateException("cannot find transaction entry for " + gtx);
|
997 |
| } |
998 |
| |
999 |
1121518
| modifications = entry.getModifications();
|
1000 |
1121518
| ctx = cache.getInvocationContext();
|
1001 |
1121518
| ctx.setOriginLocal(false);
|
1002 |
| } |
1003 |
| |
1004 |
| |
1005 |
| |
1006 |
1121770
| public void afterCompletion(int status)
|
1007 |
| { |
1008 |
1121770
| try
|
1009 |
| { |
1010 |
| |
1011 |
27
| if (ctx == null) ctx = cache.getInvocationContext();
|
1012 |
1121770
| setTransactionalContext(tx, gtx, ctx);
|
1013 |
| |
1014 |
1121770
| try
|
1015 |
| { |
1016 |
0
| if (txManager.getTransaction() != null && !txManager.getTransaction().equals(tx)) txManager.resume(tx);
|
1017 |
| } |
1018 |
| catch (Exception e) |
1019 |
| { |
1020 |
0
| log.error("afterCompletion error: " + status, e);
|
1021 |
| } |
1022 |
| |
1023 |
| |
1024 |
0
| if (log.isTraceEnabled()) log.trace("calling aftercompletion for " + gtx);
|
1025 |
| |
1026 |
?
| if ((entry = txTable.get(gtx)) != null)
|
1027 |
| { |
1028 |
1121770
| modifications = entry.getModifications();
|
1029 |
1121770
| ctx.setOptionOverrides(entry.getOption());
|
1030 |
| } |
1031 |
1121770
| transactions.remove(tx);
|
1032 |
| |
1033 |
1121770
| switch (status)
|
1034 |
| { |
1035 |
1121442
| case Status.STATUS_COMMITTED:
|
1036 |
| |
1037 |
| |
1038 |
1121442
| boolean onePhaseCommit = !configuration.isNodeLockingOptimistic() && configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
|
1039 |
0
| if (log.isDebugEnabled()) log.debug("Running commit phase. One phase? " + onePhaseCommit);
|
1040 |
1121442
| runCommitPhase(ctx, gtx, tx, modifications, onePhaseCommit);
|
1041 |
1121436
| log.debug("Finished commit phase");
|
1042 |
1121436
| break;
|
1043 |
| |
1044 |
71
| case Status.STATUS_MARKED_ROLLBACK:
|
1045 |
257
| case Status.STATUS_ROLLEDBACK:
|
1046 |
328
| log.debug("Running rollback phase");
|
1047 |
328
| runRollbackPhase(ctx, gtx, tx, modifications);
|
1048 |
328
| log.debug("Finished rollback phase");
|
1049 |
328
| break;
|
1050 |
| |
1051 |
0
| default:
|
1052 |
0
| throw new IllegalStateException("illegal status: " + status);
|
1053 |
| } |
1054 |
| } |
1055 |
| finally |
1056 |
| { |
1057 |
| |
1058 |
1121770
| txTable.remove(gtx);
|
1059 |
1121770
| txTable.remove(tx);
|
1060 |
1121770
| setTransactionalContext(null, null, ctx);
|
1061 |
1121770
| cleanupInternalState();
|
1062 |
| } |
1063 |
| } |
1064 |
| |
1065 |
| |
1066 |
| |
1067 |
| |
1068 |
| |
1069 |
| |
1070 |
1121770
| private void cleanupInternalState()
|
1071 |
| { |
1072 |
1121770
| this.tx = null;
|
1073 |
1121769
| this.gtx = null;
|
1074 |
1121770
| this.cache = null;
|
1075 |
1121770
| this.modifications = null;
|
1076 |
1121770
| this.entry = null;
|
1077 |
| } |
1078 |
| |
1079 |
1
| public String toString()
|
1080 |
| { |
1081 |
1
| return "TxInterceptor.RemoteSynchronizationHandler(gtx=" + gtx + ", tx=" + getTxAsString() + ")";
|
1082 |
| } |
1083 |
| |
1084 |
77
| protected String getTxAsString()
|
1085 |
| { |
1086 |
| |
1087 |
77
| if (tx == null)
|
1088 |
6
| return null;
|
1089 |
| |
1090 |
71
| return tx.getClass().getName() + "@" + System.identityHashCode(tx);
|
1091 |
| } |
1092 |
| } |
1093 |
| |
1094 |
| private class LocalSynchronizationHandler extends RemoteSynchronizationHandler |
1095 |
| { |
1096 |
| private boolean localRollbackOnly = true; |
1097 |
| |
1098 |
| |
1099 |
| |
1100 |
| |
1101 |
| private boolean remoteLocal = false; |
1102 |
| |
1103 |
| |
1104 |
| |
1105 |
| |
1106 |
| |
1107 |
| |
1108 |
| |
1109 |
| |
1110 |
| |
1111 |
| |
1112 |
| |
1113 |
| |
1114 |
| |
1115 |
| |
1116 |
1121078
| LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, CacheSPI cache, boolean remoteLocal)
|
1117 |
| { |
1118 |
1121078
| super(gtx, tx, cache);
|
1119 |
1121078
| this.remoteLocal = remoteLocal;
|
1120 |
| } |
1121 |
| |
1122 |
1120805
| @Override
|
1123 |
| public void beforeCompletion() |
1124 |
| { |
1125 |
1120805
| super.beforeCompletion();
|
1126 |
1120805
| ctx.setOriginLocal(!remoteLocal);
|
1127 |
| |
1128 |
| |
1129 |
1120805
| setTransactionalContext(tx, gtx, ctx);
|
1130 |
1120805
| if (modifications.size() == 0)
|
1131 |
| { |
1132 |
0
| if (log.isTraceEnabled()) log.trace("No modifications in this tx. Skipping beforeCompletion()");
|
1133 |
1053364
| return;
|
1134 |
| } |
1135 |
| |
1136 |
| |
1137 |
67441
| ctx.setOptionOverrides(entry.getOption());
|
1138 |
| |
1139 |
67441
| try
|
1140 |
| { |
1141 |
67441
| switch (tx.getStatus())
|
1142 |
| { |
1143 |
| |
1144 |
0
| case Status.STATUS_ACTIVE:
|
1145 |
67441
| case Status.STATUS_PREPARING:
|
1146 |
| |
1147 |
67441
| Object result = runPreparePhase(ctx, gtx, modifications);
|
1148 |
| |
1149 |
67379
| if (result instanceof Throwable)
|
1150 |
| { |
1151 |
0
| tx.setRollbackOnly();
|
1152 |
0
| throw (Throwable) result;
|
1153 |
| } |
1154 |
67379
| break;
|
1155 |
0
| default:
|
1156 |
0
| throw new CacheException("transaction " + tx + " in status " + tx.getStatus() + " unbale to start transaction");
|
1157 |
| } |
1158 |
| } |
1159 |
| catch (Throwable t) |
1160 |
| { |
1161 |
62
| try
|
1162 |
| { |
1163 |
62
| tx.setRollbackOnly();
|
1164 |
| } |
1165 |
| catch (SystemException se) |
1166 |
| { |
1167 |
0
| throw new RuntimeException("setting tx rollback failed ", se);
|
1168 |
| } |
1169 |
62
| if (t instanceof RuntimeException)
|
1170 |
61
| throw (RuntimeException) t;
|
1171 |
| else |
1172 |
1
| throw new RuntimeException("", t);
|
1173 |
| } |
1174 |
| finally |
1175 |
| { |
1176 |
67441
| localRollbackOnly = false;
|
1177 |
67441
| setTransactionalContext(null, null, ctx);
|
1178 |
| } |
1179 |
| } |
1180 |
| |
1181 |
1121030
| @Override
|
1182 |
| public void afterCompletion(int status) |
1183 |
| { |
1184 |
| |
1185 |
236
| if (ctx == null) ctx = cache.getInvocationContext();
|
1186 |
1121030
| ctx.setLocalRollbackOnly(localRollbackOnly);
|
1187 |
1121030
| super.afterCompletion(status);
|
1188 |
| } |
1189 |
| |
1190 |
76
| public String toString()
|
1191 |
| { |
1192 |
76
| return "TxInterceptor.LocalSynchronizationHandler(gtx=" + gtx + ", tx=" + getTxAsString() + ")";
|
1193 |
| } |
1194 |
| } |
1195 |
| } |