1 |
| |
2 |
| |
3 |
| |
4 |
| |
5 |
| |
6 |
| |
7 |
| package org.jboss.cache.interceptors; |
8 |
| |
9 |
| import org.jboss.cache.CacheException; |
10 |
| import org.jboss.cache.Fqn; |
11 |
| import org.jboss.cache.InvocationContext; |
12 |
| import org.jboss.cache.config.Configuration; |
13 |
| import org.jboss.cache.config.Option; |
14 |
| import org.jboss.cache.marshall.MethodCall; |
15 |
| import org.jboss.cache.marshall.MethodCallFactory; |
16 |
| import org.jboss.cache.marshall.MethodDeclarations; |
17 |
| import org.jboss.cache.optimistic.DataVersion; |
18 |
| import org.jboss.cache.optimistic.DefaultDataVersion; |
19 |
| import org.jboss.cache.optimistic.TransactionWorkspace; |
20 |
| import org.jboss.cache.optimistic.WorkspaceNode; |
21 |
| import org.jboss.cache.transaction.GlobalTransaction; |
22 |
| import org.jboss.cache.transaction.OptimisticTransactionEntry; |
23 |
| import org.jboss.cache.util.concurrent.ConcurrentHashSet; |
24 |
| |
25 |
| import java.util.ArrayList; |
26 |
| import java.util.List; |
27 |
| import java.util.Set; |
28 |
| |
29 |
| |
30 |
| |
31 |
| |
32 |
| |
33 |
| |
34 |
| |
35 |
| |
36 |
| |
37 |
| |
38 |
| |
39 |
| public class OptimisticReplicationInterceptor extends BaseRpcInterceptor |
40 |
| { |
41 |
| |
42 |
| |
43 |
| |
44 |
| |
45 |
| private final Set<GlobalTransaction> broadcastTxs = new ConcurrentHashSet<GlobalTransaction>(); |
46 |
| |
47 |
2972
| public Object invoke(InvocationContext ctx) throws Throwable
|
48 |
| { |
49 |
2972
| MethodCall m = ctx.getMethodCall();
|
50 |
| |
51 |
55
| if (MethodDeclarations.isBuddyGroupOrganisationMethod(m.getMethodId())) return super.invoke(ctx);
|
52 |
| |
53 |
2917
| Option optionOverride = ctx.getOptionOverrides();
|
54 |
2917
| if (optionOverride != null && optionOverride.isCacheModeLocal() && ctx.getTransaction() == null)
|
55 |
| { |
56 |
| |
57 |
0
| log.debug("Skipping replication for this call as cache mode is local, forced via an option override.");
|
58 |
0
| return super.invoke(ctx);
|
59 |
| } |
60 |
| |
61 |
2917
| Object retval;
|
62 |
| |
63 |
0
| if (log.isTraceEnabled()) log.trace("Processing method " + m);
|
64 |
| |
65 |
| |
66 |
| |
67 |
2917
| GlobalTransaction gtx = null;
|
68 |
| |
69 |
2917
| switch (m.getMethodId())
|
70 |
| { |
71 |
405
| case MethodDeclarations.optimisticPrepareMethod_id:
|
72 |
| |
73 |
405
| retval = super.invoke(ctx);
|
74 |
394
| gtx = getGlobalTransaction(ctx);
|
75 |
| |
76 |
394
| if (!gtx.isRemote() && ctx.isOriginLocal())
|
77 |
| { |
78 |
| |
79 |
218
| broadcastPrepare(m, gtx);
|
80 |
| } |
81 |
387
| break;
|
82 |
1180
| case MethodDeclarations.commitMethod_id:
|
83 |
| |
84 |
1180
| Throwable remoteCommitException = null;
|
85 |
1180
| gtx = getGlobalTransaction(ctx);
|
86 |
1180
| if (!gtx.isRemote() && ctx.isOriginLocal() && broadcastTxs.contains(gtx))
|
87 |
| { |
88 |
| |
89 |
163
| try
|
90 |
| { |
91 |
163
| broadcastCommit(gtx);
|
92 |
| } |
93 |
| catch (Throwable t) |
94 |
| { |
95 |
2
| log.error("A problem occurred with remote commit", t);
|
96 |
2
| remoteCommitException = t;
|
97 |
| } |
98 |
| } |
99 |
| |
100 |
1180
| retval = super.invoke(ctx);
|
101 |
1180
| if (remoteCommitException != null)
|
102 |
| { |
103 |
2
| throw remoteCommitException;
|
104 |
| } |
105 |
1178
| break;
|
106 |
31
| case MethodDeclarations.rollbackMethod_id:
|
107 |
| |
108 |
31
| gtx = getGlobalTransaction(ctx);
|
109 |
31
| Throwable remoteRollbackException = null;
|
110 |
31
| if (!gtx.isRemote() && ctx.isOriginLocal() && broadcastTxs.contains(gtx))
|
111 |
| { |
112 |
| |
113 |
6
| try
|
114 |
| { |
115 |
6
| broadcastRollback(gtx);
|
116 |
| } |
117 |
| catch (Throwable t) |
118 |
| { |
119 |
4
| log.error(" a problem occurred with remote rollback", t);
|
120 |
4
| remoteRollbackException = t;
|
121 |
| } |
122 |
| |
123 |
| } |
124 |
31
| retval = super.invoke(ctx);
|
125 |
31
| if (remoteRollbackException != null)
|
126 |
| { |
127 |
4
| throw remoteRollbackException;
|
128 |
| } |
129 |
27
| break;
|
130 |
16
| case MethodDeclarations.putForExternalReadMethodLocal_id:
|
131 |
16
| gtx = getGlobalTransaction(ctx);
|
132 |
16
| cache.getTransactionTable().get(gtx).setForceAsyncReplication(true);
|
133 |
| |
134 |
1285
| default:
|
135 |
| |
136 |
0
| if (log.isTraceEnabled()) log.trace("Received method " + m + " not handling");
|
137 |
1301
| retval = super.invoke(ctx);
|
138 |
1301
| break;
|
139 |
| } |
140 |
2893
| return retval;
|
141 |
| } |
142 |
| |
143 |
1621
| private GlobalTransaction getGlobalTransaction(InvocationContext ctx)
|
144 |
| { |
145 |
| |
146 |
1621
| GlobalTransaction gtx = ctx.getGlobalTransaction();
|
147 |
1621
| if (gtx == null)
|
148 |
| { |
149 |
0
| throw new CacheException("failed to get global transaction");
|
150 |
| } |
151 |
1621
| return gtx;
|
152 |
| } |
153 |
| |
154 |
218
| protected void broadcastPrepare(MethodCall methodCall, GlobalTransaction gtx) throws Throwable
|
155 |
| { |
156 |
218
| boolean remoteCallSync = configuration.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
|
157 |
| |
158 |
218
| Object[] args = methodCall.getArgs();
|
159 |
218
| List modifications = (List) args[1];
|
160 |
218
| int num_mods = modifications != null ? modifications.size() : 0;
|
161 |
| |
162 |
| |
163 |
218
| if (cache.getMembers() != null && cache.getMembers().size() > 1)
|
164 |
| { |
165 |
| |
166 |
| |
167 |
170
| MethodCall toBroadcast = mapDataVersionedMethodCalls(methodCall, getTransactionWorkspace(gtx));
|
168 |
| |
169 |
| |
170 |
169
| broadcastTxs.add(gtx);
|
171 |
169
| if (log.isDebugEnabled())
|
172 |
| { |
173 |
0
| log.debug("(" + cache.getLocalAddress()
|
174 |
| + "): broadcasting prepare for " + gtx |
175 |
| + " (" + num_mods + " modifications"); |
176 |
| } |
177 |
| |
178 |
169
| replicateCall(toBroadcast, remoteCallSync);
|
179 |
| } |
180 |
| else |
181 |
| { |
182 |
| |
183 |
48
| if (log.isDebugEnabled())
|
184 |
| { |
185 |
0
| log.debug("(" + cache.getLocalAddress()
|
186 |
| + "):not broadcasting prepare as members are " + cache.getMembers()); |
187 |
| } |
188 |
| } |
189 |
| } |
190 |
| |
191 |
| |
192 |
163
| protected void broadcastCommit(GlobalTransaction gtx) throws Throwable
|
193 |
| { |
194 |
163
| boolean remoteCallSync = configuration.isSyncCommitPhase();
|
195 |
| |
196 |
| |
197 |
163
| if (cache.getMembers() != null && cache.getMembers().size() > 1)
|
198 |
| { |
199 |
163
| try
|
200 |
| { |
201 |
163
| broadcastTxs.remove(gtx);
|
202 |
163
| MethodCall commit_method = MethodCallFactory.create(MethodDeclarations.commitMethod, gtx);
|
203 |
| |
204 |
163
| if (log.isDebugEnabled())
|
205 |
0
| log.debug("running remote commit for " + gtx + " and coord=" + cache.getLocalAddress());
|
206 |
| |
207 |
163
| replicateCall(commit_method, remoteCallSync);
|
208 |
| } |
209 |
| catch (Exception e) |
210 |
| { |
211 |
0
| log.error("Commit failed", e);
|
212 |
0
| throw e;
|
213 |
| } |
214 |
| } |
215 |
| } |
216 |
| |
217 |
6
| protected void broadcastRollback(GlobalTransaction gtx) throws Throwable
|
218 |
| { |
219 |
6
| boolean remoteCallSync = configuration.isSyncRollbackPhase();
|
220 |
| |
221 |
6
| if (cache.getMembers() != null && cache.getMembers().size() > 1)
|
222 |
| { |
223 |
| |
224 |
5
| try
|
225 |
| { |
226 |
5
| broadcastTxs.remove(gtx);
|
227 |
5
| MethodCall rollback_method = MethodCallFactory.create(MethodDeclarations.rollbackMethod, gtx);
|
228 |
| |
229 |
5
| if (log.isDebugEnabled())
|
230 |
0
| log.debug("running remote rollback for " + gtx + " and coord=" + cache.getLocalAddress());
|
231 |
5
| replicateCall(rollback_method, remoteCallSync);
|
232 |
| } |
233 |
| catch (Exception e) |
234 |
| { |
235 |
4
| log.error("Rollback failed", e);
|
236 |
4
| throw e;
|
237 |
| } |
238 |
| } |
239 |
| } |
240 |
| |
241 |
170
| private MethodCall mapDataVersionedMethodCalls(MethodCall m, TransactionWorkspace w)
|
242 |
| { |
243 |
170
| Object[] origArgs = m.getArgs();
|
244 |
170
| return MethodCallFactory.create(m.getMethod(), origArgs[0], translate((List<MethodCall>) origArgs[1], w), origArgs[2], origArgs[3], origArgs[4]);
|
245 |
| } |
246 |
| |
247 |
| |
248 |
| |
249 |
| |
250 |
170
| private List<MethodCall> translate(List<MethodCall> l, TransactionWorkspace w)
|
251 |
| { |
252 |
170
| List<MethodCall> newList = new ArrayList<MethodCall>();
|
253 |
170
| for (MethodCall origCall : l)
|
254 |
| { |
255 |
195
| if (MethodDeclarations.isDataGravitationMethod(origCall.getMethodId()))
|
256 |
| { |
257 |
| |
258 |
4
| newList.add(origCall);
|
259 |
| } |
260 |
| else |
261 |
| { |
262 |
191
| Object[] origArgs = origCall.getArgs();
|
263 |
| |
264 |
| |
265 |
| |
266 |
191
| Fqn fqn = (Fqn) origArgs[origCall.getMethodId() == MethodDeclarations.moveMethodLocal_id ? 0 : 1];
|
267 |
| |
268 |
191
| DataVersion versionToBroadcast = getVersionToBroadcast(w, fqn);
|
269 |
| |
270 |
| |
271 |
| |
272 |
190
| Object[] newArgs = new Object[origArgs.length + 1];
|
273 |
190
| System.arraycopy(origArgs, 0, newArgs, 0, origArgs.length);
|
274 |
190
| newArgs[origArgs.length] = versionToBroadcast;
|
275 |
| |
276 |
| |
277 |
190
| MethodCall newCall = MethodCallFactory.create(MethodDeclarations.getVersionedMethod(origCall.getMethodId()), newArgs);
|
278 |
| |
279 |
| |
280 |
190
| newList.add(newCall);
|
281 |
| } |
282 |
| } |
283 |
169
| return newList;
|
284 |
| } |
285 |
| |
286 |
| |
287 |
| |
288 |
| |
289 |
| |
290 |
191
| private DataVersion getVersionToBroadcast(TransactionWorkspace w, Fqn f)
|
291 |
| { |
292 |
191
| WorkspaceNode n = w.getNode(f);
|
293 |
191
| if (n == null)
|
294 |
| { |
295 |
0
| if (log.isTraceEnabled()) log.trace("Fqn " + f + " not found in workspace; not using a data version.");
|
296 |
1
| return null;
|
297 |
| } |
298 |
190
| if (n.isVersioningImplicit())
|
299 |
| { |
300 |
178
| DefaultDataVersion v = (DefaultDataVersion) n.getVersion();
|
301 |
177
| if (log.isTraceEnabled())
|
302 |
0
| log.trace("Fqn " + f + " has implicit versioning. Broadcasting an incremented version.");
|
303 |
| |
304 |
| |
305 |
177
| return v.increment();
|
306 |
| } |
307 |
| else |
308 |
| { |
309 |
0
| if (log.isTraceEnabled()) log.trace("Fqn " + f + " has explicit versioning. Broadcasting the version as-is.");
|
310 |
12
| return n.getVersion();
|
311 |
| } |
312 |
| } |
313 |
| |
314 |
170
| protected TransactionWorkspace getTransactionWorkspace(GlobalTransaction gtx) throws CacheException
|
315 |
| { |
316 |
170
| OptimisticTransactionEntry transactionEntry = (OptimisticTransactionEntry) cache.getTransactionTable().get(gtx);
|
317 |
| |
318 |
170
| if (transactionEntry == null)
|
319 |
| { |
320 |
0
| throw new CacheException("unable to map global transaction " + gtx + " to transaction entry");
|
321 |
| } |
322 |
| |
323 |
| |
324 |
170
| return transactionEntry.getTransactionWorkSpace();
|
325 |
| } |
326 |
| |
327 |
| } |