1 |
| |
2 |
| |
3 |
| |
4 |
| |
5 |
| |
6 |
| |
7 |
| package org.jboss.cache.interceptors; |
8 |
| |
9 |
| import org.jboss.cache.CacheSPI; |
10 |
| import org.jboss.cache.Fqn; |
11 |
| import org.jboss.cache.InvocationContext; |
12 |
| import org.jboss.cache.config.Configuration; |
13 |
| import org.jboss.cache.config.Option; |
14 |
| import org.jboss.cache.marshall.MethodCall; |
15 |
| import org.jboss.cache.marshall.MethodCallFactory; |
16 |
| import org.jboss.cache.marshall.MethodDeclarations; |
17 |
| import org.jboss.cache.optimistic.TransactionWorkspace; |
18 |
| import org.jboss.cache.transaction.GlobalTransaction; |
19 |
| import org.jboss.cache.transaction.OptimisticTransactionEntry; |
20 |
| import org.jboss.cache.transaction.TransactionEntry; |
21 |
| import org.jboss.cache.transaction.TransactionTable; |
22 |
| |
23 |
| import javax.transaction.SystemException; |
24 |
| import javax.transaction.Transaction; |
25 |
| import java.util.HashMap; |
26 |
| import java.util.HashSet; |
27 |
| import java.util.LinkedList; |
28 |
| import java.util.List; |
29 |
| import java.util.Map; |
30 |
| import java.util.Set; |
31 |
| |
32 |
| |
33 |
| |
34 |
| |
35 |
| |
36 |
| |
37 |
| |
38 |
| |
39 |
| |
40 |
| |
41 |
| |
42 |
| |
43 |
| |
44 |
| public class InvalidationInterceptor extends BaseRpcInterceptor implements InvalidationInterceptorMBean |
45 |
| { |
46 |
| private long m_invalidations = 0; |
47 |
| protected TransactionTable txTable; |
48 |
| |
49 |
145
| public void setCache(CacheSPI cache)
|
50 |
| { |
51 |
145
| super.setCache(cache);
|
52 |
145
| txTable = cache.getTransactionTable();
|
53 |
| } |
54 |
| |
55 |
976
| public Object invoke(InvocationContext ctx) throws Throwable
|
56 |
| { |
57 |
976
| MethodCall m = ctx.getMethodCall();
|
58 |
976
| Option optionOverride = ctx.getOptionOverrides();
|
59 |
976
| if (optionOverride != null && optionOverride.isCacheModeLocal() && ctx.getTransaction() == null)
|
60 |
| { |
61 |
| |
62 |
12
| return super.invoke(ctx);
|
63 |
| } |
64 |
| |
65 |
964
| Transaction tx = ctx.getTransaction();
|
66 |
964
| Object retval = super.invoke(ctx);
|
67 |
| |
68 |
0
| if (log.isTraceEnabled()) log.trace("(" + cache.getLocalAddress() + ") method call " + m);
|
69 |
| |
70 |
| |
71 |
962
| if (MethodDeclarations.isCrudMethod(m.getMethodId()))
|
72 |
| { |
73 |
161
| if (m.getMethodId() != MethodDeclarations.putForExternalReadMethodLocal_id)
|
74 |
| { |
75 |
0
| if (log.isDebugEnabled()) log.debug("Is a CRUD method");
|
76 |
161
| Fqn fqn = findFqn(m.getArgs());
|
77 |
161
| if (fqn != null)
|
78 |
| { |
79 |
| |
80 |
161
| if (tx == null || !isValid(tx))
|
81 |
| { |
82 |
| |
83 |
| |
84 |
65
| invalidateAcrossCluster(fqn, null);
|
85 |
| } |
86 |
| } |
87 |
| } |
88 |
| else |
89 |
| { |
90 |
0
| log.debug("Encountered a putForExternalRead() - is a no op.");
|
91 |
| } |
92 |
| } |
93 |
| else |
94 |
| { |
95 |
| |
96 |
801
| if (tx != null && isValid(tx))
|
97 |
| { |
98 |
| |
99 |
266
| switch (m.getMethodId())
|
100 |
| { |
101 |
15
| case MethodDeclarations.prepareMethod_id:
|
102 |
48
| case MethodDeclarations.optimisticPrepareMethod_id:
|
103 |
63
| log.debug("Entering InvalidationInterceptor's prepare phase");
|
104 |
| |
105 |
63
| GlobalTransaction gtx = ctx.getGlobalTransaction();
|
106 |
63
| TransactionEntry entry = txTable.get(gtx);
|
107 |
0
| if (entry == null) throw new IllegalStateException("cannot find transaction entry for " + gtx);
|
108 |
63
| List<MethodCall> modifications = new LinkedList<MethodCall>(entry.getModifications());
|
109 |
| |
110 |
63
| if (modifications.size() > 0)
|
111 |
| { |
112 |
63
| if (containsPutForExternalRead(modifications))
|
113 |
| { |
114 |
0
| log.debug("Modification list contains a putForExternalRead operation. Not invalidating.");
|
115 |
| } |
116 |
| else |
117 |
| { |
118 |
63
| try
|
119 |
| { |
120 |
63
| invalidateModifications(modifications, configuration.isNodeLockingOptimistic() ? getWorkspace(gtx) : null);
|
121 |
| } |
122 |
| catch (Throwable t) |
123 |
| { |
124 |
1
| log.warn("Unable to broadcast evicts as a part of the prepare phase. Rolling back.", t);
|
125 |
1
| try
|
126 |
| { |
127 |
1
| tx.setRollbackOnly();
|
128 |
| } |
129 |
| catch (SystemException se) |
130 |
| { |
131 |
0
| throw new RuntimeException("setting tx rollback failed ", se);
|
132 |
| } |
133 |
1
| if (t instanceof RuntimeException)
|
134 |
1
| throw t;
|
135 |
| else |
136 |
0
| throw new RuntimeException("Unable to broadcast invalidation messages", t);
|
137 |
| } |
138 |
| } |
139 |
| } |
140 |
| else |
141 |
| { |
142 |
0
| log.debug("Nothing to invalidate - no modifications in the transaction.");
|
143 |
| } |
144 |
| |
145 |
62
| break;
|
146 |
| } |
147 |
| } |
148 |
| |
149 |
| } |
150 |
961
| return retval;
|
151 |
| } |
152 |
| |
153 |
63
| private boolean containsPutForExternalRead(List<MethodCall> l)
|
154 |
| { |
155 |
63
| for (MethodCall m : l)
|
156 |
63
| if (m.getMethodId() == MethodDeclarations.putForExternalReadMethodLocal_id || m.getMethodId() == MethodDeclarations.putForExternalReadVersionedMethodLocal_id)
|
157 |
0
| return true;
|
158 |
63
| return false;
|
159 |
| } |
160 |
| |
161 |
4
| public long getInvalidations()
|
162 |
| { |
163 |
4
| return m_invalidations;
|
164 |
| } |
165 |
| |
166 |
2
| public void resetStatistics()
|
167 |
| { |
168 |
2
| m_invalidations = 0;
|
169 |
| } |
170 |
| |
171 |
0
| public Map<String, Object> dumpStatistics()
|
172 |
| { |
173 |
0
| Map<String, Object> retval = new HashMap<String, Object>();
|
174 |
0
| retval.put("Invalidations", m_invalidations);
|
175 |
0
| return retval;
|
176 |
| } |
177 |
| |
178 |
128
| protected void invalidateAcrossCluster(Fqn fqn, TransactionWorkspace workspace) throws Throwable
|
179 |
| { |
180 |
| |
181 |
128
| if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
|
182 |
128
| m_invalidations++;
|
183 |
| |
184 |
| |
185 |
128
| MethodCall call = workspace != null && !workspace.isVersioningImplicit() ?
|
186 |
| MethodCallFactory.create(MethodDeclarations.evictVersionedNodeMethodLocal, fqn, workspace.getNode(fqn).getVersion()) : |
187 |
| MethodCallFactory.create(MethodDeclarations.evictNodeMethodLocal, fqn); |
188 |
| |
189 |
0
| if (log.isDebugEnabled()) log.debug("Cache [" + cache.getLocalAddress() + "] replicating " + call);
|
190 |
| |
191 |
128
| replicateCall(call, configuration.getCacheMode() == Configuration.CacheMode.INVALIDATION_SYNC);
|
192 |
| } |
193 |
| |
194 |
63
| protected void invalidateModifications(List<MethodCall> modifications, TransactionWorkspace workspace) throws Throwable
|
195 |
| { |
196 |
| |
197 |
63
| Set<Fqn> modifiedFqns = optimisedIterator(modifications);
|
198 |
63
| for (Fqn fqn : modifiedFqns) invalidateAcrossCluster(fqn, workspace);
|
199 |
| } |
200 |
| |
201 |
48
| protected TransactionWorkspace getWorkspace(GlobalTransaction gtx)
|
202 |
| { |
203 |
48
| OptimisticTransactionEntry entry = (OptimisticTransactionEntry) txTable.get(gtx);
|
204 |
48
| return entry.getTransactionWorkSpace();
|
205 |
| } |
206 |
| |
207 |
224
| protected Fqn findFqn(Object[] objects)
|
208 |
| { |
209 |
| |
210 |
224
| return (Fqn) objects[1];
|
211 |
| } |
212 |
| |
213 |
| |
214 |
| |
215 |
| |
216 |
| |
217 |
| |
218 |
| |
219 |
| |
220 |
| |
221 |
63
| protected Set<Fqn> optimisedIterator(List<MethodCall> list)
|
222 |
| { |
223 |
63
| Set<Fqn> fqns = new HashSet<Fqn>();
|
224 |
63
| for (MethodCall mc : list)
|
225 |
| { |
226 |
63
| if (MethodDeclarations.isCrudMethod(mc.getMethodId()))
|
227 |
| { |
228 |
63
| fqns.add(findFqn(mc.getArgs()));
|
229 |
| } |
230 |
| } |
231 |
63
| return fqns;
|
232 |
| } |
233 |
| } |