1 |
| |
2 |
| |
3 |
| |
4 |
| package org.jboss.cache.loader; |
5 |
| |
6 |
| import org.apache.commons.logging.Log; |
7 |
| import org.apache.commons.logging.LogFactory; |
8 |
| import org.jboss.cache.CacheException; |
9 |
| import org.jboss.cache.Fqn; |
10 |
| import org.jboss.cache.Modification; |
11 |
| import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig; |
12 |
| import org.jboss.cache.util.MapCopy; |
13 |
| |
14 |
| import java.io.IOException; |
15 |
| import java.util.ArrayList; |
16 |
| import java.util.HashMap; |
17 |
| import java.util.List; |
18 |
| import java.util.Map; |
19 |
| import java.util.concurrent.ArrayBlockingQueue; |
20 |
| import java.util.concurrent.BlockingQueue; |
21 |
| import java.util.concurrent.atomic.AtomicBoolean; |
22 |
| import java.util.concurrent.atomic.AtomicInteger; |
23 |
| |
24 |
| |
25 |
| |
26 |
| |
27 |
| |
28 |
| |
29 |
| |
30 |
| |
31 |
| |
32 |
| |
33 |
| |
34 |
| |
35 |
| |
36 |
| |
37 |
| |
38 |
| |
39 |
| |
40 |
| |
41 |
| |
42 |
| |
43 |
| |
44 |
| |
45 |
| |
46 |
| |
47 |
| |
48 |
| |
49 |
| |
50 |
| |
51 |
| |
52 |
| |
53 |
| |
54 |
| |
55 |
| |
56 |
| |
57 |
| |
58 |
| |
59 |
| |
60 |
| |
61 |
| |
62 |
| |
63 |
| |
64 |
| |
65 |
| |
66 |
| |
67 |
| |
68 |
| |
69 |
| |
70 |
| |
71 |
| |
72 |
| |
73 |
| |
74 |
| |
75 |
| |
76 |
| |
77 |
| |
78 |
| |
79 |
| public class AsyncCacheLoader extends AbstractDelegatingCacheLoader |
80 |
| { |
81 |
| |
82 |
| private static final Log log = LogFactory.getLog(AsyncCacheLoader.class); |
83 |
| |
84 |
| private static AtomicInteger threadId = new AtomicInteger(0); |
85 |
| |
86 |
| |
87 |
| |
88 |
| |
89 |
| public static final int DEFAULT_QUEUE_SIZE = 10000; |
90 |
| |
91 |
| private AsyncCacheLoaderConfig config; |
92 |
| private AsyncProcessor processor; |
93 |
| private AtomicBoolean stopped = new AtomicBoolean(true); |
94 |
| private BlockingQueue<Modification> queue = new ArrayBlockingQueue<Modification>(DEFAULT_QUEUE_SIZE); |
95 |
| |
96 |
0
| public AsyncCacheLoader()
|
97 |
| { |
98 |
0
| super(null);
|
99 |
| } |
100 |
| |
101 |
21
| public AsyncCacheLoader(CacheLoader cacheLoader)
|
102 |
| { |
103 |
21
| super(cacheLoader);
|
104 |
| } |
105 |
| |
106 |
21
| public void setConfig(IndividualCacheLoaderConfig base)
|
107 |
| { |
108 |
21
| if (base instanceof AsyncCacheLoaderConfig)
|
109 |
| { |
110 |
0
| config = (AsyncCacheLoaderConfig) base;
|
111 |
| } |
112 |
| else |
113 |
| { |
114 |
21
| config = new AsyncCacheLoaderConfig(base);
|
115 |
| } |
116 |
| |
117 |
21
| if (config.getQueueSize() > 0)
|
118 |
| { |
119 |
1
| queue = new ArrayBlockingQueue<Modification>(config.getQueueSize());
|
120 |
| } |
121 |
| |
122 |
21
| super.setConfig(base);
|
123 |
| } |
124 |
| |
125 |
14
| public Map get(Fqn name) throws Exception
|
126 |
| { |
127 |
14
| try
|
128 |
| { |
129 |
14
| return super.get(name);
|
130 |
| } |
131 |
| catch (IOException e) |
132 |
| { |
133 |
| |
134 |
0
| log.trace(e);
|
135 |
0
| return new HashMap();
|
136 |
| } |
137 |
| } |
138 |
| |
139 |
58
| Object get(Fqn name, Object key) throws Exception
|
140 |
| { |
141 |
58
| if (config.getReturnOld())
|
142 |
| { |
143 |
55
| try
|
144 |
| { |
145 |
55
| Map map = super.get(name);
|
146 |
55
| if (map != null)
|
147 |
| { |
148 |
49
| return map.get(key);
|
149 |
| } |
150 |
| } |
151 |
| catch (IOException e) |
152 |
| { |
153 |
| |
154 |
0
| log.trace(e);
|
155 |
| } |
156 |
| } |
157 |
9
| return null;
|
158 |
| } |
159 |
| |
160 |
58
| public Object put(Fqn name, Object key, Object value) throws Exception
|
161 |
| { |
162 |
58
| if (config.getUseAsyncPut())
|
163 |
| { |
164 |
57
| Object oldValue = get(name, key);
|
165 |
57
| Modification mod = new Modification(Modification.ModificationType.PUT_KEY_VALUE, name, key, value);
|
166 |
57
| enqueue(mod);
|
167 |
57
| return oldValue;
|
168 |
| } |
169 |
| else |
170 |
| { |
171 |
1
| return super.put(name, key, value);
|
172 |
| } |
173 |
| } |
174 |
| |
175 |
1
| public void put(Fqn name, Map attributes) throws Exception
|
176 |
| { |
177 |
1
| if (config.getUseAsyncPut())
|
178 |
| { |
179 |
| |
180 |
0
| Map attrs = (attributes == null ? null : new MapCopy(attributes));
|
181 |
0
| Modification mod = new Modification(Modification.ModificationType.PUT_DATA, name, attrs);
|
182 |
0
| enqueue(mod);
|
183 |
| } |
184 |
| else |
185 |
| { |
186 |
1
| super.put(name, attributes);
|
187 |
| } |
188 |
| } |
189 |
| |
190 |
1
| public void put(List<Modification> modifications) throws Exception
|
191 |
| { |
192 |
1
| if (config.getUseAsyncPut())
|
193 |
| { |
194 |
0
| for (Modification modification : modifications)
|
195 |
| { |
196 |
0
| enqueue(modification);
|
197 |
| } |
198 |
| } |
199 |
| else |
200 |
| { |
201 |
1
| super.put(modifications);
|
202 |
| } |
203 |
| } |
204 |
| |
205 |
1
| public Object remove(Fqn name, Object key) throws Exception
|
206 |
| { |
207 |
1
| Object oldValue = get(name, key);
|
208 |
1
| Modification mod = new Modification(Modification.ModificationType.REMOVE_KEY_VALUE, name, key);
|
209 |
1
| enqueue(mod);
|
210 |
1
| return oldValue;
|
211 |
| } |
212 |
| |
213 |
10
| public void remove(Fqn name) throws Exception
|
214 |
| { |
215 |
10
| Modification mod = new Modification(Modification.ModificationType.REMOVE_NODE, name);
|
216 |
10
| enqueue(mod);
|
217 |
| } |
218 |
| |
219 |
0
| public void removeData(Fqn name) throws Exception
|
220 |
| { |
221 |
0
| Modification mod = new Modification(Modification.ModificationType.REMOVE_DATA, name);
|
222 |
0
| enqueue(mod);
|
223 |
| } |
224 |
| |
225 |
8
| public void start() throws Exception
|
226 |
| { |
227 |
8
| if (log.isInfoEnabled()) log.info("Async cache loader starting: " + this);
|
228 |
8
| stopped.set(false);
|
229 |
8
| super.start();
|
230 |
8
| processor = new AsyncProcessor();
|
231 |
8
| processor.start();
|
232 |
| } |
233 |
| |
234 |
8
| public void stop()
|
235 |
| { |
236 |
8
| stopped.set(true);
|
237 |
8
| if (processor != null)
|
238 |
| { |
239 |
8
| processor.stop();
|
240 |
| } |
241 |
8
| super.stop();
|
242 |
| } |
243 |
| |
244 |
68
| private void enqueue(Modification mod)
|
245 |
| throws CacheException, InterruptedException |
246 |
| { |
247 |
68
| if (stopped.get())
|
248 |
| { |
249 |
1
| throw new CacheException("AsyncCacheLoader stopped; no longer accepting more entries.");
|
250 |
| } |
251 |
67
| if (log.isTraceEnabled())
|
252 |
| { |
253 |
0
| log.trace("Enqueuing modification " + mod);
|
254 |
| } |
255 |
67
| queue.put(mod);
|
256 |
| } |
257 |
| |
258 |
| |
259 |
| |
260 |
| |
261 |
| |
262 |
| |
263 |
| private class AsyncProcessor implements Runnable |
264 |
| { |
265 |
| private Thread t; |
266 |
| |
267 |
| |
268 |
| private final List<Modification> mods = new ArrayList<Modification>(config.getBatchSize()); |
269 |
| |
270 |
8
| public void start()
|
271 |
| { |
272 |
8
| if (t == null || !t.isAlive())
|
273 |
| { |
274 |
8
| t = new Thread(this, "AsyncCacheLoader-" + threadId.getAndIncrement());
|
275 |
8
| t.setDaemon(true);
|
276 |
8
| t.start();
|
277 |
| } |
278 |
| } |
279 |
| |
280 |
8
| public void stop()
|
281 |
| { |
282 |
8
| if (t != null)
|
283 |
| { |
284 |
8
| t.interrupt();
|
285 |
8
| try
|
286 |
| { |
287 |
8
| t.join();
|
288 |
| } |
289 |
| catch (InterruptedException e) |
290 |
| { |
291 |
| } |
292 |
| } |
293 |
8
| if (!queue.isEmpty())
|
294 |
| { |
295 |
0
| log.warn("Async queue not yet empty, possibly interrupted");
|
296 |
| } |
297 |
| } |
298 |
| |
299 |
8
| public void run()
|
300 |
| { |
301 |
8
| while (!Thread.interrupted())
|
302 |
| { |
303 |
59
| try
|
304 |
| { |
305 |
59
| run0();
|
306 |
| } |
307 |
| catch (InterruptedException e) |
308 |
| { |
309 |
2
| break;
|
310 |
| } |
311 |
| } |
312 |
| |
313 |
8
| try
|
314 |
| { |
315 |
0
| if (log.isTraceEnabled()) log.trace("process remaining batch " + mods.size());
|
316 |
8
| put(mods);
|
317 |
0
| if (log.isTraceEnabled()) log.trace("process remaining queued " + queue.size());
|
318 |
8
| while (!queue.isEmpty())
|
319 |
| { |
320 |
4
| run0();
|
321 |
| } |
322 |
| } |
323 |
| catch (InterruptedException e) |
324 |
| { |
325 |
0
| log.trace("remaining interrupted");
|
326 |
| } |
327 |
| } |
328 |
| |
329 |
63
| private void run0() throws InterruptedException
|
330 |
| { |
331 |
63
| log.trace("Checking for modifications");
|
332 |
63
| int i = queue.drainTo(mods, config.getBatchSize());
|
333 |
63
| if (i == 0)
|
334 |
| { |
335 |
46
| Modification m = queue.take();
|
336 |
44
| mods.add(m);
|
337 |
| } |
338 |
| |
339 |
61
| if (log.isTraceEnabled())
|
340 |
| { |
341 |
0
| log.trace("Calling put(List) with " + mods.size() + " modifications");
|
342 |
| } |
343 |
61
| put(mods);
|
344 |
61
| mods.clear();
|
345 |
| } |
346 |
| |
347 |
69
| private void put(List<Modification> mods)
|
348 |
| { |
349 |
69
| try
|
350 |
| { |
351 |
69
| AsyncCacheLoader.super.put(mods);
|
352 |
| } |
353 |
| catch (Exception e) |
354 |
| { |
355 |
0
| if (log.isWarnEnabled()) log.warn("Failed to process async modifications: " + e);
|
356 |
0
| log.debug("Exception: ", e);
|
357 |
| } |
358 |
| } |
359 |
| |
360 |
2
| public String toString()
|
361 |
| { |
362 |
2
| return "TQ t=" + t;
|
363 |
| } |
364 |
| |
365 |
| } |
366 |
| |
367 |
9
| public String toString()
|
368 |
| { |
369 |
9
| return super.toString() +
|
370 |
| " delegate=[" + super.getCacheLoader() + "]" + |
371 |
| " processor=" + processor + |
372 |
| " stopped=" + stopped + |
373 |
| " batchSize=" + config.getBatchSize() + |
374 |
| " returnOld=" + config.getReturnOld() + |
375 |
| " asyncPut=" + config.getUseAsyncPut() + |
376 |
| " queue.remainingCapacity()=" + queue.remainingCapacity() + |
377 |
| " queue.peek()=" + queue.peek(); |
378 |
| } |
379 |
| |
380 |
| } |