1 |
| |
2 |
| |
3 |
| |
4 |
| |
5 |
| |
6 |
| |
7 |
| package org.jboss.cache.interceptors; |
8 |
| |
9 |
| import org.apache.commons.logging.Log; |
10 |
| import org.apache.commons.logging.LogFactory; |
11 |
| import org.jboss.cache.CacheException; |
12 |
| import org.jboss.cache.CacheSPI; |
13 |
| import org.jboss.cache.Fqn; |
14 |
| import org.jboss.cache.InvocationContext; |
15 |
| import org.jboss.cache.Node; |
16 |
| import org.jboss.cache.NodeSPI; |
17 |
| import org.jboss.cache.buddyreplication.BuddyManager; |
18 |
| import org.jboss.cache.buddyreplication.GravitateResult; |
19 |
| import org.jboss.cache.config.Configuration; |
20 |
| import org.jboss.cache.marshall.MethodCall; |
21 |
| import org.jboss.cache.marshall.MethodCallFactory; |
22 |
| import org.jboss.cache.marshall.MethodDeclarations; |
23 |
| import org.jboss.cache.marshall.NodeData; |
24 |
| import org.jboss.cache.transaction.GlobalTransaction; |
25 |
| import org.jboss.cache.transaction.TransactionEntry; |
26 |
| import org.jgroups.Address; |
27 |
| import org.jgroups.blocks.GroupRequest; |
28 |
| |
29 |
| import java.util.ArrayList; |
30 |
| import java.util.Collection; |
31 |
| import java.util.Collections; |
32 |
| import java.util.List; |
33 |
| import java.util.Map; |
34 |
| import java.util.concurrent.ConcurrentHashMap; |
35 |
| |
36 |
| |
37 |
| |
38 |
| |
39 |
| |
40 |
| |
41 |
| |
42 |
| |
43 |
| |
44 |
| |
45 |
| |
46 |
| |
47 |
| |
48 |
| |
49 |
| |
50 |
| |
51 |
| public class DataGravitatorInterceptor extends BaseRpcInterceptor |
52 |
| { |
53 |
| private BuddyManager buddyManager; |
54 |
| private boolean syncCommunications = false; |
55 |
| private Log log = LogFactory.getLog(DataGravitatorInterceptor.class); |
56 |
| private Map transactionMods = new ConcurrentHashMap(); |
57 |
| |
58 |
332
| public void setCache(CacheSPI cache)
|
59 |
| { |
60 |
332
| super.setCache(cache);
|
61 |
332
| this.buddyManager = cache.getBuddyManager();
|
62 |
332
| syncCommunications = configuration.getCacheMode() == Configuration.CacheMode.REPL_SYNC || configuration.getCacheMode() == Configuration.CacheMode.INVALIDATION_SYNC;
|
63 |
| } |
64 |
| |
65 |
3335
| public Object invoke(InvocationContext ctx) throws Throwable
|
66 |
| { |
67 |
3335
| MethodCall m = ctx.getMethodCall();
|
68 |
0
| if (log.isTraceEnabled()) log.trace("Invoked with method call " + m);
|
69 |
| |
70 |
3335
| if (MethodDeclarations.isBlockUnblockMethod(m.getMethodId()))
|
71 |
| { |
72 |
1339
| return super.invoke(ctx);
|
73 |
| } |
74 |
| |
75 |
| |
76 |
1996
| if (!MethodDeclarations.isTransactionLifecycleMethod(m.getMethodId()))
|
77 |
| { |
78 |
1790
| if (isGravitationEnabled(ctx) && MethodDeclarations.isGetMethod(m.getMethodId()))
|
79 |
| { |
80 |
| |
81 |
191
| Fqn fqn = extractFqn(m.getMethodId(), m.getArgs());
|
82 |
0
| if (log.isTraceEnabled()) log.trace("Checking local existence of fqn " + fqn);
|
83 |
191
| if (BuddyManager.isBackupFqn(fqn))
|
84 |
| { |
85 |
15
| log.info("Is call for a backup Fqn, not performing any gravitation. Direct calls on internal backup nodes are *not* supported.");
|
86 |
| } |
87 |
| else |
88 |
| { |
89 |
176
| if (cache.peek(fqn, false) == null)
|
90 |
| { |
91 |
50
| BackupData data;
|
92 |
| |
93 |
| |
94 |
50
| if (localBackupExists(fqn))
|
95 |
| { |
96 |
15
| log.trace("Gravitating from local backup tree");
|
97 |
15
| data = localBackupGet(fqn, ctx);
|
98 |
| } |
99 |
| else |
100 |
| { |
101 |
35
| log.trace("Gravitating from remote backup tree");
|
102 |
| |
103 |
35
| data = remoteBackupGet(fqn);
|
104 |
| } |
105 |
| |
106 |
50
| if (data != null)
|
107 |
| { |
108 |
| |
109 |
| |
110 |
| |
111 |
| |
112 |
34
| log.trace("Passing the put call locally to make sure state is persisted and ownership is correctly established.");
|
113 |
34
| createNode(data.backupData, false);
|
114 |
| |
115 |
| |
116 |
| |
117 |
| |
118 |
| |
119 |
| |
120 |
33
| cleanBackupData(data, ctx.getGlobalTransaction());
|
121 |
| } |
122 |
| } |
123 |
| } |
124 |
| } |
125 |
| else |
126 |
| { |
127 |
1599
| if (log.isTraceEnabled())
|
128 |
| { |
129 |
0
| log.trace("Suppressing data gravitation for this call.");
|
130 |
| } |
131 |
| } |
132 |
| } |
133 |
| else |
134 |
| { |
135 |
| |
136 |
206
| try
|
137 |
| { |
138 |
206
| switch (m.getMethodId())
|
139 |
| { |
140 |
24
| case MethodDeclarations.prepareMethod_id:
|
141 |
21
| case MethodDeclarations.optimisticPrepareMethod_id:
|
142 |
45
| Object o = super.invoke(ctx);
|
143 |
45
| doPrepare(ctx.getGlobalTransaction());
|
144 |
45
| return o;
|
145 |
1
| case MethodDeclarations.rollbackMethod_id:
|
146 |
1
| transactionMods.remove(ctx.getGlobalTransaction());
|
147 |
1
| return super.invoke(ctx);
|
148 |
160
| case MethodDeclarations.commitMethod_id:
|
149 |
160
| doCommit(ctx.getGlobalTransaction());
|
150 |
159
| transactionMods.remove(ctx.getGlobalTransaction());
|
151 |
159
| return super.invoke(ctx);
|
152 |
| } |
153 |
| } |
154 |
| catch (Throwable throwable) |
155 |
| { |
156 |
1
| transactionMods.remove(ctx.getGlobalTransaction());
|
157 |
1
| throw throwable;
|
158 |
| } |
159 |
| } |
160 |
| |
161 |
| |
162 |
| |
163 |
| |
164 |
| |
165 |
| |
166 |
| |
167 |
1789
| return super.invoke(ctx);
|
168 |
| } |
169 |
| |
170 |
1790
| private boolean isGravitationEnabled(InvocationContext ctx)
|
171 |
| { |
172 |
1790
| boolean enabled = ctx.isOriginLocal();
|
173 |
1790
| if (enabled)
|
174 |
| { |
175 |
467
| if (!buddyManager.isAutoDataGravitation())
|
176 |
| { |
177 |
374
| enabled = ctx.getOptionOverrides().getForceDataGravitation();
|
178 |
| } |
179 |
| } |
180 |
1790
| return enabled;
|
181 |
| } |
182 |
| |
183 |
45
| private void doPrepare(GlobalTransaction gtx) throws Throwable
|
184 |
| { |
185 |
45
| MethodCall cleanup = (MethodCall) transactionMods.get(gtx);
|
186 |
0
| if (log.isTraceEnabled()) log.trace("Broadcasting prepare for cleanup ops " + cleanup);
|
187 |
45
| if (cleanup != null)
|
188 |
| { |
189 |
5
| MethodCall prepare;
|
190 |
5
| List mods = new ArrayList(1);
|
191 |
5
| mods.add(cleanup);
|
192 |
5
| if (configuration.isNodeLockingOptimistic())
|
193 |
| { |
194 |
4
| prepare = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod, gtx, mods, null, cache.getLocalAddress(), false);
|
195 |
| } |
196 |
| else |
197 |
| { |
198 |
1
| prepare = MethodCallFactory.create(MethodDeclarations.prepareMethod, gtx, mods, cache.getLocalAddress(), syncCommunications);
|
199 |
| } |
200 |
| |
201 |
5
| replicateCall(getMembersOutsideBuddyGroup(), prepare, syncCommunications);
|
202 |
| } |
203 |
| else |
204 |
| { |
205 |
0
| if (log.isTraceEnabled()) log.trace("Nothing to broadcast in prepare phase for gtx " + gtx);
|
206 |
| } |
207 |
| } |
208 |
| |
209 |
160
| private void doCommit(GlobalTransaction gtx) throws Throwable
|
210 |
| { |
211 |
160
| if (transactionMods.containsKey(gtx))
|
212 |
| { |
213 |
0
| if (log.isTraceEnabled()) log.trace("Broadcasting commit for gtx " + gtx);
|
214 |
5
| replicateCall(getMembersOutsideBuddyGroup(), MethodCallFactory.create(MethodDeclarations.commitMethod, gtx), syncCommunications);
|
215 |
| } |
216 |
| else |
217 |
| { |
218 |
0
| if (log.isTraceEnabled()) log.trace("Nothing to broadcast in commit phase for gtx " + gtx);
|
219 |
| } |
220 |
| } |
221 |
| |
222 |
10
| private List<Address> getMembersOutsideBuddyGroup()
|
223 |
| { |
224 |
10
| List<Address> members = new ArrayList<Address>(cache.getMembers());
|
225 |
10
| members.remove(cache.getLocalAddress());
|
226 |
10
| members.removeAll(buddyManager.getBuddyAddresses());
|
227 |
10
| return members;
|
228 |
| } |
229 |
| |
230 |
35
| private BackupData remoteBackupGet(Fqn name) throws Exception
|
231 |
| { |
232 |
| |
233 |
35
| BackupData result = null;
|
234 |
| |
235 |
35
| GravitateResult gr = gravitateData(name);
|
236 |
| |
237 |
35
| if (gr.isDataFound())
|
238 |
| { |
239 |
19
| if (log.isTraceEnabled())
|
240 |
| { |
241 |
0
| log.trace("Got response " + gr);
|
242 |
| } |
243 |
| |
244 |
| |
245 |
| |
246 |
| |
247 |
| |
248 |
| |
249 |
| |
250 |
| |
251 |
| |
252 |
| |
253 |
| |
254 |
| |
255 |
| |
256 |
| |
257 |
| |
258 |
| |
259 |
| |
260 |
| |
261 |
| |
262 |
| |
263 |
| |
264 |
| |
265 |
| |
266 |
19
| result = new BackupData(name, gr);
|
267 |
| } |
268 |
| |
269 |
35
| return result;
|
270 |
| } |
271 |
| |
272 |
33
| private void cleanBackupData(BackupData backup, GlobalTransaction gtx) throws Throwable
|
273 |
| { |
274 |
| |
275 |
| |
276 |
| |
277 |
| |
278 |
| |
279 |
| |
280 |
| |
281 |
| |
282 |
| |
283 |
| |
284 |
| |
285 |
| |
286 |
33
| MethodCall cleanup = MethodCallFactory.create(MethodDeclarations.dataGravitationCleanupMethod, gtx, backup.primaryFqn, backup.backupFqn);
|
287 |
| |
288 |
| |
289 |
0
| if (log.isTraceEnabled()) log.trace("Performing cleanup on [" + backup.primaryFqn + "]");
|
290 |
33
| if (gtx == null)
|
291 |
| { |
292 |
| |
293 |
| |
294 |
| |
295 |
| |
296 |
0
| if (log.isTraceEnabled()) log.trace("Performing cleanup on [" + backup.backupFqn + "]");
|
297 |
| |
298 |
| |
299 |
27
| replicateCall(cache.getMembers(), cleanup, syncCommunications);
|
300 |
| } |
301 |
| else |
302 |
| { |
303 |
6
| if (log.isTraceEnabled())
|
304 |
| { |
305 |
0
| log.trace("Data gravitation performed under global transaction " + gtx + ". Not broadcasting cleanups until the tx commits. Adding to tx mod list instead.");
|
306 |
| } |
307 |
6
| transactionMods.put(gtx, cleanup);
|
308 |
6
| TransactionEntry te = getTransactionEntry(gtx);
|
309 |
6
| te.addModification(cleanup);
|
310 |
| } |
311 |
| } |
312 |
| |
313 |
35
| private GravitateResult gravitateData(Fqn fqn) throws Exception
|
314 |
| { |
315 |
35
| if (log.isTraceEnabled())
|
316 |
| { |
317 |
0
| log.trace("cache=" + cache.getLocalAddress() + "; requesting data gravitation for Fqn " + fqn);
|
318 |
| } |
319 |
35
| List<Address> mbrs = cache.getMembers();
|
320 |
35
| Boolean searchSubtrees = (buddyManager.isDataGravitationSearchBackupTrees() ? Boolean.TRUE : Boolean.FALSE);
|
321 |
35
| MethodCall dGrav = MethodCallFactory.create(MethodDeclarations.dataGravitationMethod, fqn, searchSubtrees);
|
322 |
| |
323 |
| |
324 |
35
| List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav, GroupRequest.GET_ALL, true, buddyManager.getBuddyCommunicationTimeout());
|
325 |
35
| if (log.isTraceEnabled())
|
326 |
| { |
327 |
0
| log.trace("got responses " + resps);
|
328 |
| } |
329 |
35
| if (resps == null)
|
330 |
| { |
331 |
0
| log.error("No replies to call " + dGrav + ". Perhaps we're alone in the cluster?");
|
332 |
0
| return GravitateResult.noDataFound();
|
333 |
| } |
334 |
| |
335 |
| |
336 |
35
| GravitateResult result = GravitateResult.noDataFound();
|
337 |
35
| for (Object o : resps)
|
338 |
| { |
339 |
35
| if (o instanceof Throwable)
|
340 |
| { |
341 |
0
| if (log.isDebugEnabled())
|
342 |
| { |
343 |
0
| log.debug("Found remote Throwable among responses - removing from responses list", (Exception) o);
|
344 |
| } |
345 |
| } |
346 |
35
| else if (o != null)
|
347 |
| { |
348 |
35
| result = (GravitateResult) o;
|
349 |
35
| if (result.isDataFound())
|
350 |
| { |
351 |
19
| break;
|
352 |
| } |
353 |
| } |
354 |
0
| else if (!configuration.isUseRegionBasedMarshalling())
|
355 |
| { |
356 |
| |
357 |
| |
358 |
| |
359 |
0
| log.error("Unexpected null response to call " + dGrav + ".");
|
360 |
| } |
361 |
| |
362 |
| } |
363 |
| |
364 |
35
| return result;
|
365 |
| } |
366 |
| |
367 |
34
| private void createNode(List<NodeData> nodeData, boolean localOnly) throws CacheException
|
368 |
| { |
369 |
34
| for (NodeData data : nodeData)
|
370 |
| { |
371 |
39
| if (localOnly)
|
372 |
| { |
373 |
0
| if (cache.peek(data.getFqn(), false) == null)
|
374 |
| { |
375 |
0
| createNodesLocally(data.getFqn(), data.getAttributes());
|
376 |
| } |
377 |
| } |
378 |
| else |
379 |
| { |
380 |
39
| cache.put(data.getFqn(), data.getAttributes());
|
381 |
| } |
382 |
| } |
383 |
| } |
384 |
| |
385 |
0
| private void createNodesLocally(Fqn fqn, Map data) throws CacheException
|
386 |
| { |
387 |
0
| int treeNodeSize;
|
388 |
0
| if ((treeNodeSize = fqn.size()) == 0) return;
|
389 |
0
| NodeSPI n = cache.getRoot();
|
390 |
0
| for (int i = 0; i < treeNodeSize; i++)
|
391 |
| { |
392 |
0
| Object child_name = fqn.get(i);
|
393 |
0
| NodeSPI child_node = n.addChildDirect(new Fqn(child_name));
|
394 |
0
| if (child_node == null)
|
395 |
| { |
396 |
0
| if (log.isTraceEnabled())
|
397 |
| { |
398 |
0
| log.trace("failed to find or create child " + child_name + " of node " + n.getFqn());
|
399 |
| } |
400 |
0
| return;
|
401 |
| } |
402 |
0
| if (i == treeNodeSize - 1)
|
403 |
| { |
404 |
| |
405 |
0
| child_node.putAllDirect(data);
|
406 |
| } |
407 |
0
| n = child_node;
|
408 |
| } |
409 |
| } |
410 |
| |
411 |
6
| private TransactionEntry getTransactionEntry(GlobalTransaction gtx)
|
412 |
| { |
413 |
6
| return cache.getTransactionTable().get(gtx);
|
414 |
| } |
415 |
| |
416 |
191
| private Fqn extractFqn(int methodId, Object[] args)
|
417 |
| { |
418 |
191
| return (Fqn) args[MethodDeclarations.isCrudMethod(methodId) ? 1 : 0];
|
419 |
| } |
420 |
| |
421 |
50
| private boolean localBackupExists(Fqn fqn)
|
422 |
| { |
423 |
50
| boolean exists = false;
|
424 |
| |
425 |
50
| for (Node node : getBackupRootCollection())
|
426 |
| { |
427 |
50
| Fqn newSearchFqn = new Fqn(node.getFqn(), fqn);
|
428 |
50
| exists = cache.peek(newSearchFqn, false) != null;
|
429 |
15
| if (exists) break;
|
430 |
| } |
431 |
| |
432 |
50
| return exists;
|
433 |
| } |
434 |
| |
435 |
15
| private BackupData localBackupGet(Fqn fqn, InvocationContext ctx) throws CacheException
|
436 |
| { |
437 |
15
| GravitateResult result = cache.gravitateData(fqn, true);
|
438 |
15
| boolean found = result.isDataFound();
|
439 |
15
| BackupData data = null;
|
440 |
| |
441 |
15
| if (found)
|
442 |
| { |
443 |
15
| Fqn backupFqn = result.getBuddyBackupFqn();
|
444 |
15
| data = new BackupData(fqn, result);
|
445 |
| |
446 |
15
| if (buddyManager.isDataGravitationRemoveOnFind())
|
447 |
| { |
448 |
| |
449 |
| |
450 |
15
| ctx.getOptionOverrides().setCacheModeLocal(true);
|
451 |
15
| cache.removeNode(backupFqn);
|
452 |
| } |
453 |
| else |
454 |
| { |
455 |
0
| cache.evict(backupFqn, true);
|
456 |
| } |
457 |
| } |
458 |
| |
459 |
15
| return data;
|
460 |
| } |
461 |
| |
462 |
50
| private Collection<Node> getBackupRootCollection()
|
463 |
| { |
464 |
50
| NodeSPI backupRoot = cache.peek(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN, true);
|
465 |
50
| return backupRoot == null ? Collections.EMPTY_SET : backupRoot.getChildrenDirect();
|
466 |
| } |
467 |
| |
468 |
| private static class BackupData |
469 |
| { |
470 |
| Fqn primaryFqn; |
471 |
| Fqn backupFqn; |
472 |
| List<NodeData> backupData; |
473 |
| |
474 |
34
| public BackupData(Fqn primaryFqn, GravitateResult gr)
|
475 |
| { |
476 |
34
| this.primaryFqn = primaryFqn;
|
477 |
34
| this.backupFqn = gr.getBuddyBackupFqn();
|
478 |
34
| this.backupData = gr.getNodeData();
|
479 |
| } |
480 |
| } |
481 |
| |
482 |
| |
483 |
| } |