1 |
| package org.jboss.cache.interceptors; |
2 |
| |
3 |
| import org.jboss.cache.InvocationContext; |
4 |
| import org.jboss.cache.config.Configuration; |
5 |
| import org.jboss.cache.config.Option; |
6 |
| import org.jboss.cache.marshall.MethodCall; |
7 |
| import org.jboss.cache.marshall.MethodDeclarations; |
8 |
| import org.jboss.cache.transaction.GlobalTransaction; |
9 |
| |
10 |
| |
11 |
| |
12 |
| |
13 |
| |
14 |
| |
15 |
| |
16 |
| |
17 |
| |
18 |
| public class ReplicationInterceptor extends BaseRpcInterceptor |
19 |
| { |
20 |
| |
21 |
311664
| public Object invoke(InvocationContext ctx) throws Throwable
|
22 |
| { |
23 |
311664
| MethodCall m = ctx.getMethodCall();
|
24 |
311664
| GlobalTransaction gtx = ctx.getGlobalTransaction();
|
25 |
| |
26 |
| |
27 |
731
| if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return super.invoke(ctx);
|
28 |
| |
29 |
310933
| boolean isLocalCommitOrRollback = gtx != null && !gtx.isRemote() && (m.getMethodId() == MethodDeclarations.commitMethod_id || m.getMethodId() == MethodDeclarations.rollbackMethod_id);
|
30 |
| |
31 |
0
| if (log.isTraceEnabled()) log.trace("isLocalCommitOrRollback? " + isLocalCommitOrRollback + "; gtx = " + gtx);
|
32 |
| |
33 |
| |
34 |
310933
| Object o = isLocalCommitOrRollback ? null : super.invoke(ctx);
|
35 |
| |
36 |
| |
37 |
310901
| Option optionOverride = ctx.getOptionOverrides();
|
38 |
| |
39 |
310901
| if (optionOverride != null && optionOverride.isCacheModeLocal() && ctx.getTransaction() == null)
|
40 |
| { |
41 |
275
| log.trace("skip replication");
|
42 |
275
| return isLocalCommitOrRollback ? super.invoke(ctx) : o;
|
43 |
| } |
44 |
| |
45 |
| |
46 |
| |
47 |
310626
| if (ctx.getTransaction() != null)
|
48 |
| { |
49 |
61852
| if (gtx != null && !gtx.isRemote())
|
50 |
| { |
51 |
| |
52 |
56276
| switch (m.getMethodId())
|
53 |
| { |
54 |
10590
| case MethodDeclarations.commitMethod_id:
|
55 |
| |
56 |
10577
| if (containsModifications(ctx)) replicateCall(m, configuration.isSyncCommitPhase());
|
57 |
| |
58 |
10588
| o = super.invoke(ctx);
|
59 |
10587
| break;
|
60 |
10645
| case MethodDeclarations.prepareMethod_id:
|
61 |
10645
| if (containsModifications(ctx))
|
62 |
| { |
63 |
| |
64 |
10639
| runPreparePhase(m, gtx);
|
65 |
| } |
66 |
10621
| break;
|
67 |
55
| case MethodDeclarations.rollbackMethod_id:
|
68 |
| |
69 |
55
| if (containsModifications(ctx) && !ctx.isLocalRollbackOnly())
|
70 |
| { |
71 |
24
| replicateCall(m, configuration.isSyncRollbackPhase());
|
72 |
| } |
73 |
| |
74 |
55
| o = super.invoke(ctx);
|
75 |
55
| break;
|
76 |
0
| case MethodDeclarations.putForExternalReadMethodLocal_id:
|
77 |
0
| cache.getTransactionTable().get(gtx).setForceAsyncReplication(true);
|
78 |
| } |
79 |
| } |
80 |
| } |
81 |
248774
| else if (MethodDeclarations.isCrudMethod(m.getMethodId()))
|
82 |
| { |
83 |
| |
84 |
0
| if (log.isTraceEnabled()) log.trace("Non-tx crud meth");
|
85 |
132939
| if (ctx.isOriginLocal())
|
86 |
| { |
87 |
| |
88 |
91544
| handleReplicatedMethod(m, configuration.getCacheMode());
|
89 |
| } |
90 |
| } |
91 |
| else |
92 |
| { |
93 |
0
| if (log.isTraceEnabled()) log.trace("Non-tx and non crud meth");
|
94 |
| } |
95 |
| |
96 |
310589
| return o;
|
97 |
| } |
98 |
| |
99 |
91544
| void handleReplicatedMethod(MethodCall m, Configuration.CacheMode mode) throws Throwable
|
100 |
| { |
101 |
91544
| if (log.isTraceEnabled())
|
102 |
| { |
103 |
0
| log.trace("invoking method " + m + ", members=" + cache.getMembers() + ", mode=" +
|
104 |
| configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" + |
105 |
| configuration.getSyncReplTimeout()); |
106 |
| } |
107 |
91544
| if (mode == Configuration.CacheMode.REPL_ASYNC || m.getMethodId() == MethodDeclarations.putForExternalReadMethodLocal_id)
|
108 |
| { |
109 |
| |
110 |
248
| replicateCall(m, false);
|
111 |
| } |
112 |
| else |
113 |
| { |
114 |
| |
115 |
| |
116 |
91296
| replicateCall(m, true);
|
117 |
| } |
118 |
| } |
119 |
| |
120 |
| |
121 |
| |
122 |
| |
123 |
| |
124 |
| |
125 |
| |
126 |
| |
127 |
| |
128 |
| |
129 |
| |
130 |
| |
131 |
| |
132 |
| |
133 |
| |
134 |
10639
| protected void runPreparePhase(MethodCall prepareMethod, GlobalTransaction gtx) throws Throwable
|
135 |
| { |
136 |
10639
| boolean async = configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
|
137 |
10639
| if (log.isTraceEnabled())
|
138 |
| { |
139 |
0
| log.trace("(" + cache.getLocalAddress() + "): running remote prepare for global tx " + gtx + " with async mode=" + async);
|
140 |
| } |
141 |
| |
142 |
| |
143 |
10639
| replicateCall(prepareMethod, !async);
|
144 |
| } |
145 |
| } |