1 |
| |
2 |
| |
3 |
| |
4 |
| package org.jboss.cache.interceptors; |
5 |
| |
6 |
| import org.jboss.cache.CacheSPI; |
7 |
| import org.jboss.cache.InvocationContext; |
8 |
| import org.jboss.cache.buddyreplication.BuddyManager; |
9 |
| import org.jboss.cache.marshall.MethodCall; |
10 |
| import org.jboss.cache.marshall.MethodCallFactory; |
11 |
| import org.jboss.cache.marshall.MethodDeclarations; |
12 |
| import org.jboss.cache.transaction.GlobalTransaction; |
13 |
| import org.jboss.cache.transaction.TransactionEntry; |
14 |
| import org.jgroups.Address; |
15 |
| |
16 |
| import javax.transaction.Transaction; |
17 |
| import java.util.List; |
18 |
| |
19 |
| |
20 |
| |
21 |
| |
22 |
| |
23 |
| |
24 |
| |
25 |
| public abstract class BaseRpcInterceptor extends Interceptor |
26 |
| { |
27 |
| |
28 |
| private BuddyManager buddyManager; |
29 |
| private boolean usingBuddyReplication; |
30 |
| |
31 |
2575
| public void setCache(CacheSPI cache)
|
32 |
| { |
33 |
2575
| super.setCache(cache);
|
34 |
2575
| buddyManager = cache.getBuddyManager();
|
35 |
2575
| usingBuddyReplication = buddyManager != null;
|
36 |
| } |
37 |
| |
38 |
| |
39 |
| |
40 |
| |
41 |
| |
42 |
| |
43 |
| |
44 |
| |
45 |
102502
| protected void checkResponses(List rsps) throws Throwable
|
46 |
| { |
47 |
102502
| if (rsps != null)
|
48 |
| { |
49 |
42327
| for (Object rsp : rsps)
|
50 |
| { |
51 |
42400
| if (rsp != null && rsp instanceof Throwable)
|
52 |
| { |
53 |
| |
54 |
29
| if (log.isDebugEnabled())
|
55 |
0
| log.debug("Received Throwable from remote node", (Throwable) rsp);
|
56 |
29
| throw (Throwable) rsp;
|
57 |
| } |
58 |
| } |
59 |
| } |
60 |
| } |
61 |
| |
62 |
113249
| protected void replicateCall(MethodCall call, boolean sync) throws Throwable
|
63 |
| { |
64 |
113249
| replicateCall(null, call, sync);
|
65 |
| } |
66 |
| |
67 |
113286
| protected void replicateCall(List<Address> recipients, MethodCall call, boolean sync) throws Throwable
|
68 |
| { |
69 |
| |
70 |
0
| if (log.isTraceEnabled()) log.trace("Broadcasting call " + call + " to recipient list " + recipients);
|
71 |
113286
| Transaction tx = null;
|
72 |
?
| if (cache.getTransactionManager() != null && (tx = cache.getTransactionManager().getTransaction()) != null)
|
73 |
| { |
74 |
21640
| GlobalTransaction gtx = cache.getTransactionTable().get(tx);
|
75 |
21640
| TransactionEntry te = cache.getTransactionTable().get(gtx);
|
76 |
20
| if (te != null && te.isForceAsyncReplication()) sync = false;
|
77 |
| } |
78 |
113284
| if (!sync && cache.getRPCManager().getReplicationQueue() != null && !usingBuddyReplication)
|
79 |
| { |
80 |
0
| putCallOnAsyncReplicationQueue(call);
|
81 |
| } |
82 |
| else |
83 |
| { |
84 |
161
| if (usingBuddyReplication) call = buddyManager.transformFqns(call);
|
85 |
| |
86 |
113284
| List<Address> callRecipients = recipients;
|
87 |
113284
| if (callRecipients == null)
|
88 |
| { |
89 |
113247
| callRecipients = usingBuddyReplication ? buddyManager.getBuddyAddresses() : cache.getMembers();
|
90 |
113247
| if (log.isTraceEnabled())
|
91 |
0
| log.trace("Setting call recipients to " + callRecipients + " since the original list of recipients passed in is null.");
|
92 |
| } |
93 |
| |
94 |
113284
| List rsps = cache.getRPCManager().callRemoteMethods(callRecipients,
|
95 |
| MethodDeclarations.replicateMethod, |
96 |
| new Object[]{call}, |
97 |
| sync, |
98 |
| true, |
99 |
| configuration.getSyncReplTimeout()); |
100 |
113265
| if (log.isTraceEnabled())
|
101 |
| { |
102 |
0
| log.trace("responses=" + rsps);
|
103 |
| } |
104 |
102502
| if (sync) checkResponses(rsps);
|
105 |
| } |
106 |
| |
107 |
| } |
108 |
| |
109 |
0
| protected void putCallOnAsyncReplicationQueue(MethodCall call)
|
110 |
| { |
111 |
0
| if (log.isDebugEnabled()) log.debug("Putting call " + call + " on the replication queue.");
|
112 |
0
| cache.getRPCManager().getReplicationQueue().add(MethodCallFactory.create(MethodDeclarations.replicateMethod, call));
|
113 |
| } |
114 |
| |
115 |
21290
| protected boolean containsModifications(InvocationContext ctx)
|
116 |
| { |
117 |
21290
| switch (ctx.getMethodCall().getMethodId())
|
118 |
| { |
119 |
10645
| case MethodDeclarations.prepareMethod_id:
|
120 |
0
| case MethodDeclarations.optimisticPrepareMethod_id:
|
121 |
10645
| List mods = (List) ctx.getMethodCall().getArgs()[1];
|
122 |
10645
| return mods.size() > 0;
|
123 |
10590
| case MethodDeclarations.commitMethod_id:
|
124 |
55
| case MethodDeclarations.rollbackMethod_id:
|
125 |
10645
| return ctx.isTxHasMods();
|
126 |
0
| default:
|
127 |
0
| return false;
|
128 |
| } |
129 |
| } |
130 |
| } |