1 |
| package org.jboss.cache.loader.tcp; |
2 |
| |
3 |
| import org.apache.commons.logging.Log; |
4 |
| import org.apache.commons.logging.LogFactory; |
5 |
| import org.jboss.cache.Cache; |
6 |
| import org.jboss.cache.CacheException; |
7 |
| import org.jboss.cache.CacheSPI; |
8 |
| import org.jboss.cache.DefaultCacheFactory; |
9 |
| import org.jboss.cache.Fqn; |
10 |
| import org.jboss.cache.Modification; |
11 |
| import org.jboss.cache.Node; |
12 |
| import org.jboss.cache.NodeSPI; |
13 |
| import org.jboss.cache.jmx.CacheJmxWrapperMBean; |
14 |
| import org.jboss.cache.loader.DelegatingCacheLoader; |
15 |
| |
16 |
| import java.io.BufferedInputStream; |
17 |
| import java.io.BufferedOutputStream; |
18 |
| import java.io.IOException; |
19 |
| import java.io.ObjectInputStream; |
20 |
| import java.io.ObjectOutputStream; |
21 |
| import java.net.InetAddress; |
22 |
| import java.net.ServerSocket; |
23 |
| import java.net.Socket; |
24 |
| import java.net.SocketException; |
25 |
| import java.net.UnknownHostException; |
26 |
| import java.util.ArrayList; |
27 |
| import java.util.Collections; |
28 |
| import java.util.HashMap; |
29 |
| import java.util.LinkedList; |
30 |
| import java.util.List; |
31 |
| import java.util.Map; |
32 |
| import java.util.Set; |
33 |
| |
34 |
| |
35 |
| |
36 |
| |
37 |
| |
38 |
| |
39 |
| |
40 |
| |
41 |
| public class TcpCacheServer implements TcpCacheServerMBean |
42 |
| { |
43 |
| private ServerSocket srv_sock; |
44 |
| private InetAddress bind_addr = null; |
45 |
| private int port = 7500; |
46 |
| private CacheSPI cache; |
47 |
| private CacheJmxWrapperMBean wrapper; |
48 |
| private String config; |
49 |
| private boolean running = true; |
50 |
| private final List<Connection> conns = Collections.synchronizedList(new LinkedList<Connection>()); |
51 |
| |
52 |
| |
53 |
| |
54 |
| boolean daemon = true; |
55 |
| static Log log = LogFactory.getLog(TcpCacheServer.class); |
56 |
| |
57 |
| |
58 |
4
| public TcpCacheServer()
|
59 |
| { |
60 |
| } |
61 |
| |
62 |
0
| public String getBindAddress()
|
63 |
| { |
64 |
0
| return bind_addr != null ? bind_addr.toString() : "n/a";
|
65 |
| } |
66 |
| |
67 |
4
| public void setBindAddress(String bind_addr) throws UnknownHostException
|
68 |
| { |
69 |
4
| if (bind_addr != null)
|
70 |
| { |
71 |
4
| this.bind_addr = InetAddress.getByName(bind_addr);
|
72 |
| } |
73 |
| } |
74 |
| |
75 |
0
| public int getPort()
|
76 |
| { |
77 |
0
| return port;
|
78 |
| } |
79 |
| |
80 |
4
| public void setPort(int port)
|
81 |
| { |
82 |
4
| this.port = port;
|
83 |
| } |
84 |
| |
85 |
0
| public String getConfig()
|
86 |
| { |
87 |
0
| return config;
|
88 |
| } |
89 |
| |
90 |
2
| public void setConfig(String config)
|
91 |
| { |
92 |
2
| this.config = config;
|
93 |
| } |
94 |
| |
95 |
3
| public Cache getCache()
|
96 |
| { |
97 |
3
| return cache;
|
98 |
| } |
99 |
| |
100 |
1
| public void setCache(CacheSPI cache)
|
101 |
| { |
102 |
1
| this.cache = cache;
|
103 |
| } |
104 |
| |
105 |
1
| public void setCacheJmxWrapper(CacheJmxWrapperMBean wrapper)
|
106 |
| { |
107 |
1
| this.wrapper = wrapper;
|
108 |
| } |
109 |
| |
110 |
4
| public void start() throws Exception
|
111 |
| { |
112 |
4
| if (cache == null)
|
113 |
| { |
114 |
| |
115 |
3
| if (wrapper != null)
|
116 |
| { |
117 |
1
| cache = (CacheSPI) wrapper.getCache();
|
118 |
| |
119 |
1
| if (cache == null)
|
120 |
| { |
121 |
0
| throw new CacheException("cache cannot be obtained from CacheJmxWrapperMBean;" +
|
122 |
| " be sure start() is invoked on wrapper before it is invoked on the" + |
123 |
| " TcpCacheServer"); |
124 |
| } |
125 |
| } |
126 |
2
| else if (config != null)
|
127 |
| { |
128 |
2
| cache = (CacheSPI) DefaultCacheFactory.getInstance().createCache(this.config);
|
129 |
| } |
130 |
| } |
131 |
| |
132 |
4
| if (cache == null)
|
133 |
| { |
134 |
0
| throw new CacheException("cache reference is not set");
|
135 |
| } |
136 |
| |
137 |
| |
138 |
4
| srv_sock = new ServerSocket(port, 10, bind_addr);
|
139 |
4
| log.info("TcpCacheServer listening on : " + srv_sock.getInetAddress() + ":" + srv_sock.getLocalPort());
|
140 |
| |
141 |
4
| running = true;
|
142 |
| |
143 |
4
| Thread serverThread = new Thread("TcpCacheServer")
|
144 |
| { |
145 |
4
| public void run()
|
146 |
| { |
147 |
4
| try
|
148 |
| { |
149 |
68
| while (running)
|
150 |
| { |
151 |
68
| Socket client_sock = srv_sock.accept();
|
152 |
64
| Connection conn = new Connection(client_sock, cache);
|
153 |
64
| conns.add(conn);
|
154 |
64
| conn.start();
|
155 |
| } |
156 |
| } |
157 |
| catch (SocketException se) |
158 |
| { |
159 |
3
| if (!running)
|
160 |
| { |
161 |
| |
162 |
| |
163 |
3
| log.info("Shutting down TcpCacheServer");
|
164 |
| } |
165 |
| else |
166 |
| { |
167 |
0
| log.error("Caught exception! Shutting down server thread.", se);
|
168 |
| } |
169 |
| } |
170 |
| catch (IOException e) |
171 |
| { |
172 |
0
| log.error("Caught exception! Shutting down server thread.", e);
|
173 |
| } |
174 |
| } |
175 |
| }; |
176 |
4
| serverThread.setDaemon(daemon);
|
177 |
4
| serverThread.start();
|
178 |
| |
179 |
| } |
180 |
| |
181 |
3
| public void stop()
|
182 |
| { |
183 |
3
| running = false;
|
184 |
3
| synchronized(conns)
|
185 |
| { |
186 |
| |
187 |
| |
188 |
3
| List<Connection> copy = new ArrayList<Connection>(conns);
|
189 |
3
| for (Connection conn : copy)
|
190 |
| { |
191 |
3
| conn.close();
|
192 |
| } |
193 |
3
| conns.clear();
|
194 |
| } |
195 |
| |
196 |
3
| if (srv_sock != null)
|
197 |
| { |
198 |
3
| try
|
199 |
| { |
200 |
3
| srv_sock.close();
|
201 |
3
| srv_sock = null;
|
202 |
| } |
203 |
| catch (IOException e) |
204 |
| { |
205 |
| |
206 |
| } |
207 |
| } |
208 |
| } |
209 |
| |
210 |
| |
211 |
0
| public String getConnections()
|
212 |
| { |
213 |
0
| synchronized(conns)
|
214 |
| { |
215 |
0
| StringBuffer sb = new StringBuffer();
|
216 |
0
| sb.append(conns.size()).append(" connections:\n");
|
217 |
0
| for (Connection c : conns)
|
218 |
| { |
219 |
0
| sb.append(c).append("\n");
|
220 |
| } |
221 |
0
| return sb.toString();
|
222 |
| } |
223 |
| } |
224 |
| |
225 |
| |
226 |
4
| public void create()
|
227 |
| { |
228 |
| } |
229 |
| |
230 |
0
| public void destroy()
|
231 |
| { |
232 |
| } |
233 |
| |
234 |
| |
235 |
| private class Connection implements Runnable |
236 |
| { |
237 |
| private Socket sock = null; |
238 |
| private ObjectInputStream input = null; |
239 |
| private ObjectOutputStream output = null; |
240 |
| private CacheSPI c; |
241 |
| private Thread t = null; |
242 |
| |
243 |
64
| public Connection(Socket sock, CacheSPI cache) throws IOException
|
244 |
| { |
245 |
64
| this.sock = sock;
|
246 |
| |
247 |
64
| output = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()));
|
248 |
64
| output.flush();
|
249 |
| |
250 |
64
| input = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()));
|
251 |
| |
252 |
64
| c = cache;
|
253 |
| } |
254 |
| |
255 |
| |
256 |
64
| public void start()
|
257 |
| { |
258 |
64
| t = new Thread(this, "TcpCacheServer.Connection");
|
259 |
64
| t.setDaemon(true);
|
260 |
64
| t.start();
|
261 |
| } |
262 |
| |
263 |
63
| public void close()
|
264 |
| { |
265 |
63
| t = null;
|
266 |
63
| try
|
267 |
| { |
268 |
63
| if (output != null) output.close();
|
269 |
| } |
270 |
| catch (Throwable th) |
271 |
| { |
272 |
| |
273 |
| } |
274 |
63
| try
|
275 |
| { |
276 |
63
| if (input != null) input.close();
|
277 |
| } |
278 |
| catch (Throwable th) |
279 |
| { |
280 |
| |
281 |
| } |
282 |
63
| try
|
283 |
| { |
284 |
63
| if (sock != null) sock.close();
|
285 |
| } |
286 |
| catch (Throwable th) |
287 |
| { |
288 |
| |
289 |
| } |
290 |
| |
291 |
| |
292 |
63
| conns.remove(this);
|
293 |
| } |
294 |
| |
295 |
64
| public void run()
|
296 |
| { |
297 |
64
| int op;
|
298 |
64
| Fqn fqn;
|
299 |
64
| Object key, val, retval;
|
300 |
64
| NodeSPI n;
|
301 |
64
| boolean flag;
|
302 |
| |
303 |
64
| while (t != null && Thread.currentThread().equals(t))
|
304 |
| { |
305 |
12863
| try
|
306 |
| { |
307 |
12863
| op = input.readInt();
|
308 |
| } |
309 |
| catch (IOException e) |
310 |
| { |
311 |
60
| log.debug("Client closed socket");
|
312 |
60
| close();
|
313 |
60
| break;
|
314 |
| } |
315 |
| |
316 |
12803
| try
|
317 |
| { |
318 |
12803
| output.reset();
|
319 |
12803
| switch (op)
|
320 |
| { |
321 |
2044
| case DelegatingCacheLoader.delegateGetChildrenNames:
|
322 |
2044
| fqn = (Fqn) input.readObject();
|
323 |
2044
| Node node = c.getRoot().getChild(fqn);
|
324 |
2044
| Set<Object> children = node == null ? Collections.emptySet() : node.getChildrenNames();
|
325 |
2044
| output.writeObject(children);
|
326 |
2044
| break;
|
327 |
0
| case DelegatingCacheLoader.delegateGetKey:
|
328 |
0
| fqn = (Fqn) input.readObject();
|
329 |
0
| key = input.readObject();
|
330 |
0
| retval = c.get(fqn, key);
|
331 |
0
| output.writeObject(retval);
|
332 |
0
| break;
|
333 |
2288
| case DelegatingCacheLoader.delegateGet:
|
334 |
2288
| fqn = (Fqn) input.readObject();
|
335 |
2288
| n = (NodeSPI) c.getRoot().getChild(fqn);
|
336 |
2288
| if (n == null)
|
337 |
| { |
338 |
820
| output.writeObject(n);
|
339 |
820
| break;
|
340 |
| } |
341 |
1468
| Map map = n.getDataDirect();
|
342 |
0
| if (map == null) map = new HashMap();
|
343 |
1468
| output.writeObject(map);
|
344 |
1468
| break;
|
345 |
71
| case DelegatingCacheLoader.delegateExists:
|
346 |
71
| fqn = (Fqn) input.readObject();
|
347 |
71
| flag = c.getRoot().hasChild(fqn);
|
348 |
71
| output.writeObject(flag);
|
349 |
71
| break;
|
350 |
2135
| case DelegatingCacheLoader.delegatePutKeyVal:
|
351 |
2135
| fqn = (Fqn) input.readObject();
|
352 |
2135
| key = input.readObject();
|
353 |
2135
| val = input.readObject();
|
354 |
2135
| retval = c.put(fqn, key, val);
|
355 |
2135
| output.writeObject(retval);
|
356 |
2135
| break;
|
357 |
2030
| case DelegatingCacheLoader.delegatePut:
|
358 |
2030
| fqn = (Fqn) input.readObject();
|
359 |
2030
| map = (Map) input.readObject();
|
360 |
2030
| c.put(fqn, map);
|
361 |
2030
| output.writeObject(Boolean.TRUE);
|
362 |
2030
| break;
|
363 |
| |
364 |
61
| case DelegatingCacheLoader.putList:
|
365 |
61
| int length = input.readInt();
|
366 |
61
| retval = Boolean.TRUE;
|
367 |
61
| if (length > 0)
|
368 |
| { |
369 |
61
| Modification mod;
|
370 |
61
| List<Modification> mods = new ArrayList<Modification>(length);
|
371 |
61
| for (int i = 0; i < length; i++)
|
372 |
| { |
373 |
70
| mod = new Modification();
|
374 |
70
| mod.readExternal(input);
|
375 |
70
| mods.add(mod);
|
376 |
| } |
377 |
61
| try
|
378 |
| { |
379 |
61
| handleModifications(mods);
|
380 |
| } |
381 |
| catch (Exception ex) |
382 |
| { |
383 |
0
| retval = ex;
|
384 |
| } |
385 |
| } |
386 |
61
| output.writeObject(retval);
|
387 |
61
| break;
|
388 |
2024
| case DelegatingCacheLoader.delegateRemoveKey:
|
389 |
2024
| fqn = (Fqn) input.readObject();
|
390 |
2024
| key = input.readObject();
|
391 |
2024
| retval = c.remove(fqn, key);
|
392 |
2024
| output.writeObject(retval);
|
393 |
2024
| break;
|
394 |
2147
| case DelegatingCacheLoader.delegateRemove:
|
395 |
2147
| fqn = (Fqn) input.readObject();
|
396 |
2147
| c.removeNode(fqn);
|
397 |
2147
| output.writeObject(Boolean.TRUE);
|
398 |
2147
| break;
|
399 |
3
| case DelegatingCacheLoader.delegateRemoveData:
|
400 |
3
| fqn = (Fqn) input.readObject();
|
401 |
3
| node = c.getRoot().getChild(fqn);
|
402 |
3
| if (node != null)
|
403 |
| { |
404 |
3
| node.clearData();
|
405 |
3
| output.writeObject(true);
|
406 |
| } |
407 |
| else |
408 |
| { |
409 |
0
| output.writeObject(false);
|
410 |
| } |
411 |
| |
412 |
3
| break;
|
413 |
0
| case DelegatingCacheLoader.delegateLoadEntireState:
|
414 |
0
| ObjectOutputStream os = (ObjectOutputStream) input.readObject();
|
415 |
| |
416 |
0
| if (c.getCacheLoaderManager() != null)
|
417 |
| { |
418 |
0
| c.getCacheLoaderManager().getCacheLoader().loadEntireState(os);
|
419 |
| } |
420 |
0
| output.writeObject(Boolean.TRUE);
|
421 |
0
| break;
|
422 |
0
| case DelegatingCacheLoader.delegateStoreEntireState:
|
423 |
0
| ObjectInputStream is = (ObjectInputStream) input.readObject();
|
424 |
0
| if (c.getCacheLoaderManager() != null)
|
425 |
| { |
426 |
0
| c.getCacheLoaderManager().getCacheLoader().storeEntireState(is);
|
427 |
| } |
428 |
0
| output.writeObject(Boolean.TRUE);
|
429 |
0
| break;
|
430 |
0
| default:
|
431 |
0
| log.error("Operation " + op + " unknown");
|
432 |
0
| break;
|
433 |
| } |
434 |
12803
| output.flush();
|
435 |
| } |
436 |
| catch (Exception e) |
437 |
| { |
438 |
0
| log.debug(e, e);
|
439 |
0
| try
|
440 |
| { |
441 |
0
| output.writeObject(e);
|
442 |
0
| output.flush();
|
443 |
| } |
444 |
| catch (IOException e1) |
445 |
| { |
446 |
0
| log.error(e1, e1);
|
447 |
| } |
448 |
| } |
449 |
| } |
450 |
| } |
451 |
| |
452 |
| |
453 |
0
| public String toString()
|
454 |
| { |
455 |
0
| StringBuffer sb = new StringBuffer();
|
456 |
0
| if (sock != null)
|
457 |
| { |
458 |
0
| sb.append(sock.getRemoteSocketAddress());
|
459 |
| } |
460 |
0
| return sb.toString();
|
461 |
| } |
462 |
| |
463 |
61
| protected void handleModifications(List<Modification> modifications) throws CacheException
|
464 |
| { |
465 |
| |
466 |
61
| for (Modification m : modifications)
|
467 |
| { |
468 |
70
| switch (m.getType())
|
469 |
| { |
470 |
53
| case PUT_DATA:
|
471 |
53
| c.put(m.getFqn(), m.getData());
|
472 |
53
| break;
|
473 |
0
| case PUT_DATA_ERASE:
|
474 |
0
| c.put(m.getFqn(), m.getData());
|
475 |
0
| break;
|
476 |
10
| case PUT_KEY_VALUE:
|
477 |
10
| c.put(m.getFqn(), m.getKey(), m.getValue());
|
478 |
10
| break;
|
479 |
2
| case REMOVE_DATA:
|
480 |
2
| Node n = c.getRoot().getChild(m.getFqn());
|
481 |
2
| if (n != null) n.clearData();
|
482 |
2
| break;
|
483 |
2
| case REMOVE_KEY_VALUE:
|
484 |
2
| c.remove(m.getFqn(), m.getKey());
|
485 |
2
| break;
|
486 |
3
| case REMOVE_NODE:
|
487 |
3
| c.removeNode(m.getFqn());
|
488 |
3
| break;
|
489 |
0
| case MOVE:
|
490 |
0
| c.move(m.getFqn(), m.getFqn2());
|
491 |
0
| break;
|
492 |
0
| default:
|
493 |
0
| log.error("modification type " + m.getType() + " not known");
|
494 |
0
| break;
|
495 |
| } |
496 |
| } |
497 |
| } |
498 |
| |
499 |
| |
500 |
| } |
501 |
| |
502 |
| |
503 |
0
| public static void main(String[] args) throws Exception
|
504 |
| { |
505 |
0
| String bind_addr = null;
|
506 |
0
| int port = 7500;
|
507 |
0
| TcpCacheServer server;
|
508 |
0
| String config = null;
|
509 |
| |
510 |
0
| for (int i = 0; i < args.length; i++)
|
511 |
| { |
512 |
0
| if (args[i].equals("-bind_addr"))
|
513 |
| { |
514 |
0
| bind_addr = args[++i];
|
515 |
0
| continue;
|
516 |
| } |
517 |
0
| if (args[i].equals("-port"))
|
518 |
| { |
519 |
0
| port = Integer.parseInt(args[++i]);
|
520 |
0
| continue;
|
521 |
| } |
522 |
0
| if (args[i].equals("-config"))
|
523 |
| { |
524 |
0
| config = args[++i];
|
525 |
0
| continue;
|
526 |
| } |
527 |
0
| help();
|
528 |
0
| return;
|
529 |
| } |
530 |
0
| server = new TcpCacheServer();
|
531 |
0
| server.daemon = false;
|
532 |
0
| server.setBindAddress(bind_addr);
|
533 |
0
| server.setPort(port);
|
534 |
0
| server.setConfig(config);
|
535 |
0
| server.create();
|
536 |
0
| server.start();
|
537 |
| } |
538 |
| |
539 |
| |
540 |
0
| private static void help()
|
541 |
| { |
542 |
0
| System.out.println("TcpCacheServer [-bind_addr <address>] [-port <port>] [-config <config file>] [-help]");
|
543 |
| } |
544 |
| } |