1 |
| package org.jboss.cache.interceptors; |
2 |
| |
3 |
| import org.jboss.cache.CacheException; |
4 |
| import org.jboss.cache.CacheSPI; |
5 |
| import org.jboss.cache.Fqn; |
6 |
| import org.jboss.cache.InvocationContext; |
7 |
| import org.jboss.cache.Modification; |
8 |
| import org.jboss.cache.config.CacheLoaderConfig; |
9 |
| import org.jboss.cache.loader.CacheLoader; |
10 |
| import org.jboss.cache.marshall.MethodCall; |
11 |
| import org.jboss.cache.marshall.MethodDeclarations; |
12 |
| import org.jboss.cache.transaction.GlobalTransaction; |
13 |
| import org.jboss.cache.transaction.TransactionEntry; |
14 |
| import org.jboss.cache.transaction.TransactionTable; |
15 |
| |
16 |
| import javax.transaction.TransactionManager; |
17 |
| import java.lang.reflect.Method; |
18 |
| import java.util.ArrayList; |
19 |
| import java.util.Collections; |
20 |
| import java.util.HashMap; |
21 |
| import java.util.List; |
22 |
| import java.util.Map; |
23 |
| import java.util.Set; |
24 |
| import java.util.concurrent.ConcurrentHashMap; |
25 |
| |
26 |
| |
27 |
| |
28 |
| |
29 |
| |
30 |
| |
31 |
| |
32 |
| |
33 |
| public class CacheStoreInterceptor extends Interceptor implements CacheStoreInterceptorMBean |
34 |
| { |
35 |
| |
36 |
| protected CacheLoaderConfig loaderConfig = null; |
37 |
| protected TransactionManager tx_mgr = null; |
38 |
| protected TransactionTable tx_table = null; |
39 |
| private HashMap m_txStores = new HashMap(); |
40 |
| private Map preparingTxs = new ConcurrentHashMap(); |
41 |
| private long m_cacheStores = 0; |
42 |
| protected CacheLoader loader; |
43 |
| |
44 |
1586
| public void setCache(CacheSPI cache)
|
45 |
| { |
46 |
1586
| super.setCache(cache);
|
47 |
1586
| this.loaderConfig = cache.getCacheLoaderManager().getCacheLoaderConfig();
|
48 |
1586
| tx_mgr = cache.getTransactionManager();
|
49 |
1586
| tx_table = cache.getTransactionTable();
|
50 |
1586
| this.loader = cache.getCacheLoaderManager().getCacheLoader();
|
51 |
| } |
52 |
| |
53 |
| |
54 |
| |
55 |
| |
56 |
| |
57 |
| |
58 |
| |
59 |
| |
60 |
| |
61 |
474901
| public Object invoke(InvocationContext ctx) throws Throwable
|
62 |
| { |
63 |
| |
64 |
474901
| MethodCall m = ctx.getMethodCall();
|
65 |
| |
66 |
| |
67 |
| |
68 |
474901
| if (!ctx.isOriginLocal() && loaderConfig.isShared())
|
69 |
| { |
70 |
7
| log.trace("Passing up method call and bypassing this interceptor since the cache loader is shared and this call originated remotely.");
|
71 |
7
| return super.invoke(ctx);
|
72 |
| } |
73 |
| |
74 |
474894
| Fqn fqn;
|
75 |
474894
| Object key, value;
|
76 |
474894
| Object[] args = m.getArgs();
|
77 |
474894
| Object retval, tmp_retval = null;
|
78 |
474894
| boolean use_tmp_retval = false;
|
79 |
| |
80 |
| |
81 |
474894
| if (log.isTraceEnabled())
|
82 |
| { |
83 |
0
| log.trace("CacheStoreInterceptor called with meth " + m.getMethod());
|
84 |
| } |
85 |
| |
86 |
474894
| if (tx_mgr != null && tx_mgr.getTransaction() != null)
|
87 |
| { |
88 |
| |
89 |
81307
| log.trace("transactional so don't put stuff in the cloader yet.");
|
90 |
81307
| GlobalTransaction gtx = ctx.getGlobalTransaction();
|
91 |
81307
| switch (m.getMethodId())
|
92 |
| { |
93 |
20219
| case MethodDeclarations.commitMethod_id:
|
94 |
20219
| if (ctx.isTxHasMods())
|
95 |
| { |
96 |
| |
97 |
0
| if (log.isTraceEnabled()) log.trace("Calling loader.commit() for gtx " + gtx);
|
98 |
| |
99 |
20095
| List fqnsModified = getFqnsFromModificationList(tx_table.get(gtx).getCacheLoaderModifications());
|
100 |
20095
| try
|
101 |
| { |
102 |
20095
| loader.commit(gtx);
|
103 |
| } |
104 |
| finally |
105 |
| { |
106 |
20095
| preparingTxs.remove(gtx);
|
107 |
| } |
108 |
20095
| if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
|
109 |
| { |
110 |
20095
| Integer puts = (Integer) m_txStores.get(gtx);
|
111 |
20095
| if (puts != null)
|
112 |
| { |
113 |
20062
| m_cacheStores = m_cacheStores + puts;
|
114 |
| } |
115 |
20095
| m_txStores.remove(gtx);
|
116 |
| } |
117 |
| } |
118 |
| else |
119 |
| { |
120 |
124
| log.trace("Commit called with no modifications; ignoring.");
|
121 |
| } |
122 |
20219
| break;
|
123 |
45
| case MethodDeclarations.rollbackMethod_id:
|
124 |
45
| if (ctx.isTxHasMods())
|
125 |
| { |
126 |
| |
127 |
39
| if (preparingTxs.containsKey(gtx))
|
128 |
| { |
129 |
1
| preparingTxs.remove(gtx);
|
130 |
1
| loader.rollback(gtx);
|
131 |
| } |
132 |
39
| if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
|
133 |
| { |
134 |
39
| m_txStores.remove(gtx);
|
135 |
| } |
136 |
| } |
137 |
| else |
138 |
| { |
139 |
6
| log.trace("Rollback called with no modifications; ignoring.");
|
140 |
| } |
141 |
45
| break;
|
142 |
60
| case MethodDeclarations.optimisticPrepareMethod_id:
|
143 |
20049
| case MethodDeclarations.prepareMethod_id:
|
144 |
20109
| prepareCacheLoader(gtx, isOnePhaseCommitPrepareMehod(m));
|
145 |
20109
| break;
|
146 |
| } |
147 |
| |
148 |
| |
149 |
81307
| return super.invoke(ctx);
|
150 |
| } |
151 |
| |
152 |
| |
153 |
| |
154 |
| |
155 |
| |
156 |
| |
157 |
393587
| switch (m.getMethodId())
|
158 |
| { |
159 |
630
| case MethodDeclarations.removeNodeMethodLocal_id:
|
160 |
630
| fqn = (Fqn) args[1];
|
161 |
630
| loader.remove(fqn);
|
162 |
630
| break;
|
163 |
61
| case MethodDeclarations.removeKeyMethodLocal_id:
|
164 |
61
| fqn = (Fqn) args[1];
|
165 |
61
| key = args[2];
|
166 |
61
| tmp_retval = loader.remove(fqn, key);
|
167 |
61
| use_tmp_retval = true;
|
168 |
61
| break;
|
169 |
30
| case MethodDeclarations.removeDataMethodLocal_id:
|
170 |
30
| fqn = (Fqn) args[1];
|
171 |
30
| loader.removeData(fqn);
|
172 |
30
| break;
|
173 |
| } |
174 |
| |
175 |
| |
176 |
393587
| retval = super.invoke(ctx);
|
177 |
| |
178 |
| |
179 |
| |
180 |
393587
| switch (m.getMethodId())
|
181 |
| { |
182 |
22
| case MethodDeclarations.moveMethodLocal_id:
|
183 |
22
| doMove((Fqn) args[0], (Fqn) args[1]);
|
184 |
22
| break;
|
185 |
549
| case MethodDeclarations.putDataMethodLocal_id:
|
186 |
0
| case MethodDeclarations.putDataEraseMethodLocal_id:
|
187 |
549
| Modification mod = convertMethodCallToModification(m);
|
188 |
549
| log.debug(mod);
|
189 |
549
| fqn = mod.getFqn();
|
190 |
549
| loader.put(Collections.singletonList(mod));
|
191 |
549
| if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
|
192 |
| { |
193 |
549
| m_cacheStores++;
|
194 |
| } |
195 |
549
| break;
|
196 |
0
| case MethodDeclarations.putForExternalReadMethodLocal_id:
|
197 |
81959
| case MethodDeclarations.putKeyValMethodLocal_id:
|
198 |
81959
| fqn = (Fqn) args[1];
|
199 |
81959
| key = args[2];
|
200 |
81959
| value = args[3];
|
201 |
81959
| tmp_retval = loader.put(fqn, key, value);
|
202 |
81959
| use_tmp_retval = true;
|
203 |
81959
| if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
|
204 |
| { |
205 |
81959
| m_cacheStores++;
|
206 |
| } |
207 |
81959
| break;
|
208 |
| } |
209 |
| |
210 |
| |
211 |
393587
| if (use_tmp_retval)
|
212 |
| { |
213 |
82020
| return tmp_retval;
|
214 |
| } |
215 |
| else |
216 |
| { |
217 |
311567
| return retval;
|
218 |
| } |
219 |
| } |
220 |
| |
221 |
22
| private void doMove(Fqn node, Fqn parent) throws Exception
|
222 |
| { |
223 |
22
| Fqn newNodeFqn = new Fqn(parent, node.getLastElement());
|
224 |
| |
225 |
22
| recursiveMove(node, newNodeFqn);
|
226 |
22
| loader.remove(node);
|
227 |
| } |
228 |
| |
229 |
34
| private void recursiveMove(Fqn fqn, Fqn newFqn) throws Exception
|
230 |
| { |
231 |
34
| List fqns = new ArrayList();
|
232 |
34
| fqns.add(fqn);
|
233 |
34
| fqns.add(newFqn);
|
234 |
34
| loader.put(newFqn, loader.get(fqn));
|
235 |
| |
236 |
34
| Set childrenNames = loader.getChildrenNames(fqn);
|
237 |
34
| if (childrenNames != null)
|
238 |
| { |
239 |
12
| for (Object child : childrenNames)
|
240 |
| { |
241 |
12
| recursiveMove(new Fqn(fqn, child), new Fqn(newFqn, child));
|
242 |
| } |
243 |
| } |
244 |
| } |
245 |
| |
246 |
20095
| private List getFqnsFromModificationList(List<MethodCall> modifications)
|
247 |
| { |
248 |
20095
| List<Fqn> fqnList = new ArrayList<Fqn>();
|
249 |
| |
250 |
20095
| for (MethodCall mc : modifications)
|
251 |
| { |
252 |
40107
| Fqn fqn = findFqn(mc.getArgs());
|
253 |
20107
| if (fqn != null && !fqnList.contains(fqn)) fqnList.add(fqn);
|
254 |
| } |
255 |
20095
| return fqnList;
|
256 |
| } |
257 |
| |
258 |
40107
| private Fqn findFqn(Object[] args)
|
259 |
| { |
260 |
40107
| for (Object arg : args)
|
261 |
| { |
262 |
40107
| if (arg instanceof Fqn) return (Fqn) arg;
|
263 |
| } |
264 |
0
| return null;
|
265 |
| } |
266 |
| |
267 |
10
| public long getCacheLoaderStores()
|
268 |
| { |
269 |
10
| return m_cacheStores;
|
270 |
| } |
271 |
| |
272 |
1
| public void resetStatistics()
|
273 |
| { |
274 |
1
| m_cacheStores = 0;
|
275 |
| } |
276 |
| |
277 |
0
| public Map<String, Object> dumpStatistics()
|
278 |
| { |
279 |
0
| Map<String, Object> retval = new HashMap<String, Object>();
|
280 |
0
| retval.put("CacheLoaderStores", m_cacheStores);
|
281 |
0
| return retval;
|
282 |
| } |
283 |
| |
284 |
20109
| private void prepareCacheLoader(GlobalTransaction gtx, boolean onePhase) throws Exception
|
285 |
| { |
286 |
20104
| List<MethodCall> modifications;
|
287 |
20109
| TransactionEntry entry;
|
288 |
20109
| int txPuts = 0;
|
289 |
| |
290 |
20109
| entry = tx_table.get(gtx);
|
291 |
20109
| if (entry == null)
|
292 |
| { |
293 |
0
| throw new Exception("entry for transaction " + gtx + " not found in transaction table");
|
294 |
| } |
295 |
20109
| modifications = entry.getCacheLoaderModifications();
|
296 |
20109
| if (modifications.size() == 0)
|
297 |
| { |
298 |
1
| log.trace("Transaction has not logged any modifications!");
|
299 |
1
| return;
|
300 |
| } |
301 |
0
| if (log.isTraceEnabled()) log.trace("Cache loader modification list: " + modifications);
|
302 |
20108
| List cache_loader_modifications = new ArrayList();
|
303 |
20108
| for (MethodCall methodCall : modifications)
|
304 |
| { |
305 |
40340
| Modification mod = convertMethodCallToModification(methodCall);
|
306 |
40340
| cache_loader_modifications.add(mod);
|
307 |
40340
| if (configuration.getExposeManagementStatistics() && getStatisticsEnabled())
|
308 |
| { |
309 |
40340
| if ((mod.getType() == Modification.ModificationType.PUT_DATA) ||
|
310 |
| (mod.getType() == Modification.ModificationType.PUT_DATA_ERASE) || |
311 |
| (mod.getType() == Modification.ModificationType.PUT_KEY_VALUE)) |
312 |
| { |
313 |
40307
| txPuts++;
|
314 |
| } |
315 |
| } |
316 |
| } |
317 |
20108
| if (log.isTraceEnabled())
|
318 |
| { |
319 |
0
| log.trace("Converted method calls to cache loader modifications. List size: " + cache_loader_modifications.size());
|
320 |
| } |
321 |
20108
| if (cache_loader_modifications.size() > 0)
|
322 |
| { |
323 |
20108
| loader.prepare(gtx, cache_loader_modifications, onePhase);
|
324 |
20108
| preparingTxs.put(gtx, gtx);
|
325 |
20108
| if (configuration.getExposeManagementStatistics() && getStatisticsEnabled() && txPuts > 0)
|
326 |
| { |
327 |
20075
| m_txStores.put(gtx, txPuts);
|
328 |
| } |
329 |
| } |
330 |
| } |
331 |
| |
332 |
40889
| private Modification convertMethodCallToModification(MethodCall methodCall) throws Exception
|
333 |
| { |
334 |
0
| if (log.isTraceEnabled()) log.trace("Converting method call " + methodCall + " to modification.");
|
335 |
40889
| Method method = methodCall.getMethod();
|
336 |
40889
| Object[] args;
|
337 |
40889
| if (method == null)
|
338 |
| { |
339 |
0
| throw new Exception("method call has no method: " + methodCall);
|
340 |
| } |
341 |
| |
342 |
40889
| args = methodCall.getArgs();
|
343 |
40889
| Modification mod = null;
|
344 |
40889
| switch (methodCall.getMethodId())
|
345 |
| { |
346 |
603
| case MethodDeclarations.putDataMethodLocal_id:
|
347 |
603
| mod = new Modification(Modification.ModificationType.PUT_DATA,
|
348 |
| (Fqn) args[1], |
349 |
| (Map) args[2]); |
350 |
603
| break;
|
351 |
0
| case MethodDeclarations.putDataEraseMethodLocal_id:
|
352 |
0
| mod = new Modification(Modification.ModificationType.PUT_DATA_ERASE,
|
353 |
| (Fqn) args[1], |
354 |
| (Map) args[2]); |
355 |
0
| break;
|
356 |
40253
| case MethodDeclarations.putKeyValMethodLocal_id:
|
357 |
40253
| mod = new Modification(Modification.ModificationType.PUT_KEY_VALUE,
|
358 |
| (Fqn) args[1], |
359 |
| args[2], |
360 |
| args[3]); |
361 |
40253
| break;
|
362 |
27
| case MethodDeclarations.removeNodeMethodLocal_id:
|
363 |
27
| mod = new Modification(Modification.ModificationType.REMOVE_NODE,
|
364 |
| (Fqn) args[1]); |
365 |
27
| break;
|
366 |
0
| case MethodDeclarations.removeKeyMethodLocal_id:
|
367 |
0
| mod = new Modification(Modification.ModificationType.REMOVE_KEY_VALUE,
|
368 |
| (Fqn) args[1], |
369 |
| args[2]); |
370 |
0
| break;
|
371 |
0
| case MethodDeclarations.removeDataMethodLocal_id:
|
372 |
0
| mod = new Modification(Modification.ModificationType.REMOVE_DATA,
|
373 |
| (Fqn) args[1]); |
374 |
0
| break;
|
375 |
6
| case MethodDeclarations.moveMethodLocal_id:
|
376 |
6
| mod = new Modification(Modification.ModificationType.MOVE, (Fqn) args[0], (Fqn) args[1]);
|
377 |
6
| break;
|
378 |
0
| default:
|
379 |
0
| throw new CacheException("method call " + method.getName() + " cannot be converted to a modification");
|
380 |
| } |
381 |
| |
382 |
0
| if (log.isTraceEnabled()) log.trace("Converted " + methodCall + " to Modification of type " + mod.getType());
|
383 |
40889
| return mod;
|
384 |
| } |
385 |
| } |