Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 544   Methods: 24
NCLOC: 468   Classes: 2
 
 Source file Conditionals Statements Methods TOTAL
TcpCacheServer.java 46.4% 66.5% 66.7% 63%
coverage coverage
 1    package org.jboss.cache.loader.tcp;
 2   
 3    import org.apache.commons.logging.Log;
 4    import org.apache.commons.logging.LogFactory;
 5    import org.jboss.cache.Cache;
 6    import org.jboss.cache.CacheException;
 7    import org.jboss.cache.CacheSPI;
 8    import org.jboss.cache.DefaultCacheFactory;
 9    import org.jboss.cache.Fqn;
 10    import org.jboss.cache.Modification;
 11    import org.jboss.cache.Node;
 12    import org.jboss.cache.NodeSPI;
 13    import org.jboss.cache.jmx.CacheJmxWrapperMBean;
 14    import org.jboss.cache.loader.DelegatingCacheLoader;
 15   
 16    import java.io.BufferedInputStream;
 17    import java.io.BufferedOutputStream;
 18    import java.io.IOException;
 19    import java.io.ObjectInputStream;
 20    import java.io.ObjectOutputStream;
 21    import java.net.InetAddress;
 22    import java.net.ServerSocket;
 23    import java.net.Socket;
 24    import java.net.SocketException;
 25    import java.net.UnknownHostException;
 26    import java.util.ArrayList;
 27    import java.util.Collections;
 28    import java.util.HashMap;
 29    import java.util.LinkedList;
 30    import java.util.List;
 31    import java.util.Map;
 32    import java.util.Set;
 33   
 34    /**
 35    * TCP-IP based CacheServer, setCache TcpDelegatingCacheLoader with host and port of this server
 36    *
 37    * @author Bela Ban
 38    * @author Brian Stansberry
 39    * @version $Id: TcpCacheServer.java,v 1.29 2007/05/22 20:12:40 bstansberry Exp $
 40    */
 41    public class TcpCacheServer implements TcpCacheServerMBean
 42    {
 43    private ServerSocket srv_sock;
 44    private InetAddress bind_addr = null;
 45    private int port = 7500;
 46    private CacheSPI cache;
 47    private CacheJmxWrapperMBean wrapper;
 48    private String config;
 49    private boolean running = true;
 50    private final List<Connection> conns = Collections.synchronizedList(new LinkedList<Connection>());
 51    /**
 52    * whether or not to start the server thread as a daemon. Should be false if started from the command line, true if started as an MBean.
 53    */
 54    boolean daemon = true;
 55    static Log log = LogFactory.getLog(TcpCacheServer.class);
 56   
 57   
 58  4 public TcpCacheServer()
 59    {
 60    }
 61   
 62  0 public String getBindAddress()
 63    {
 64  0 return bind_addr != null ? bind_addr.toString() : "n/a";
 65    }
 66   
 67  4 public void setBindAddress(String bind_addr) throws UnknownHostException
 68    {
 69  4 if (bind_addr != null)
 70    {
 71  4 this.bind_addr = InetAddress.getByName(bind_addr);
 72    }
 73    }
 74   
 75  0 public int getPort()
 76    {
 77  0 return port;
 78    }
 79   
 80  4 public void setPort(int port)
 81    {
 82  4 this.port = port;
 83    }
 84   
 85  0 public String getConfig()
 86    {
 87  0 return config;
 88    }
 89   
 90  2 public void setConfig(String config)
 91    {
 92  2 this.config = config;
 93    }
 94   
 95  3 public Cache getCache()
 96    {
 97  3 return cache;
 98    }
 99   
 100  1 public void setCache(CacheSPI cache)
 101    {
 102  1 this.cache = cache;
 103    }
 104   
 105  1 public void setCacheJmxWrapper(CacheJmxWrapperMBean wrapper)
 106    {
 107  1 this.wrapper = wrapper;
 108    }
 109   
 110  4 public void start() throws Exception
 111    {
 112  4 if (cache == null)
 113    {
 114    // cache not directly set; get from wrapper or create from config
 115  3 if (wrapper != null)
 116    {
 117  1 cache = (CacheSPI) wrapper.getCache();
 118   
 119  1 if (cache == null)
 120    {
 121  0 throw new CacheException("cache cannot be obtained from CacheJmxWrapperMBean;" +
 122    " be sure start() is invoked on wrapper before it is invoked on the" +
 123    " TcpCacheServer");
 124    }
 125    }
 126  2 else if (config != null)
 127    {
 128  2 cache = (CacheSPI) DefaultCacheFactory.getInstance().createCache(this.config);
 129    }
 130    }
 131   
 132  4 if (cache == null)
 133    {
 134  0 throw new CacheException("cache reference is not set");
 135    }
 136   
 137   
 138  4 srv_sock = new ServerSocket(port, 10, bind_addr);
 139  4 log.info("TcpCacheServer listening on : " + srv_sock.getInetAddress() + ":" + srv_sock.getLocalPort());
 140   
 141  4 running = true;
 142   
 143  4 Thread serverThread = new Thread("TcpCacheServer")
 144    {
 145  4 public void run()
 146    {
 147  4 try
 148    {
 149  68 while (running)
 150    {
 151  68 Socket client_sock = srv_sock.accept();
 152  64 Connection conn = new Connection(client_sock, cache);
 153  64 conns.add(conn);
 154  64 conn.start();
 155    }
 156    }
 157    catch (SocketException se)
 158    {
 159  3 if (!running)
 160    {
 161    // this is because of the stop() lifecycle method being called.
 162    // ignore.
 163  3 log.info("Shutting down TcpCacheServer");
 164    }
 165    else
 166    {
 167  0 log.error("Caught exception! Shutting down server thread.", se);
 168    }
 169    }
 170    catch (IOException e)
 171    {
 172  0 log.error("Caught exception! Shutting down server thread.", e);
 173    }
 174    }
 175    };
 176  4 serverThread.setDaemon(daemon);
 177  4 serverThread.start();
 178   
 179    }
 180   
 181  3 public void stop()
 182    {
 183  3 running = false;
 184  3 synchronized(conns)
 185    {
 186    // Connection.close() removes conn from the list,
 187    // so copy off the list first to avoid ConcurrentModificationException
 188  3 List<Connection> copy = new ArrayList<Connection>(conns);
 189  3 for (Connection conn : copy)
 190    {
 191  3 conn.close();
 192    }
 193  3 conns.clear();
 194    }
 195   
 196  3 if (srv_sock != null)
 197    {
 198  3 try
 199    {
 200  3 srv_sock.close();
 201  3 srv_sock = null;
 202    }
 203    catch (IOException e)
 204    {
 205    // nada
 206    }
 207    }
 208    }
 209   
 210   
 211  0 public String getConnections()
 212    {
 213  0 synchronized(conns)
 214    {
 215  0 StringBuffer sb = new StringBuffer();
 216  0 sb.append(conns.size()).append(" connections:\n");
 217  0 for (Connection c : conns)
 218    {
 219  0 sb.append(c).append("\n");
 220    }
 221  0 return sb.toString();
 222    }
 223    }
 224   
 225   
 226  4 public void create()
 227    {
 228    }
 229   
 230  0 public void destroy()
 231    {
 232    }
 233   
 234   
 235    private class Connection implements Runnable
 236    {
 237    private Socket sock = null;
 238    private ObjectInputStream input = null;
 239    private ObjectOutputStream output = null;
 240    private CacheSPI c;
 241    private Thread t = null;
 242   
 243  64 public Connection(Socket sock, CacheSPI cache) throws IOException
 244    {
 245  64 this.sock = sock;
 246   
 247  64 output = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()));
 248  64 output.flush();
 249   
 250  64 input = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()));
 251   
 252  64 c = cache;
 253    }
 254   
 255   
 256  64 public void start()
 257    {
 258  64 t = new Thread(this, "TcpCacheServer.Connection");
 259  64 t.setDaemon(true);
 260  64 t.start();
 261    }
 262   
 263  63 public void close()
 264    {
 265  63 t = null;
 266  63 try
 267    {
 268  63 if (output != null) output.close();
 269    }
 270    catch (Throwable th)
 271    {
 272    // nada
 273    }
 274  63 try
 275    {
 276  63 if (input != null) input.close();
 277    }
 278    catch (Throwable th)
 279    {
 280    // nada
 281    }
 282  63 try
 283    {
 284  63 if (sock != null) sock.close();
 285    }
 286    catch (Throwable th)
 287    {
 288    // nada
 289    }
 290   
 291    // remove self from connections list
 292  63 conns.remove(this);
 293    }
 294   
 295  64 public void run()
 296    {
 297  64 int op;
 298  64 Fqn fqn;
 299  64 Object key, val, retval;
 300  64 NodeSPI n;
 301  64 boolean flag;
 302   
 303  64 while (t != null && Thread.currentThread().equals(t))
 304    {
 305  12863 try
 306    {
 307  12863 op = input.readInt();
 308    }
 309    catch (IOException e)
 310    {
 311  60 log.debug("Client closed socket");
 312  60 close();
 313  60 break;
 314    }
 315   
 316  12803 try
 317    {
 318  12803 output.reset();
 319  12803 switch (op)
 320    {
 321  2044 case DelegatingCacheLoader.delegateGetChildrenNames:
 322  2044 fqn = (Fqn) input.readObject();
 323  2044 Node node = c.getRoot().getChild(fqn);
 324  2044 Set<Object> children = node == null ? Collections.emptySet() : node.getChildrenNames();
 325  2044 output.writeObject(children);
 326  2044 break;
 327  0 case DelegatingCacheLoader.delegateGetKey:
 328  0 fqn = (Fqn) input.readObject();
 329  0 key = input.readObject();
 330  0 retval = c.get(fqn, key);
 331  0 output.writeObject(retval);
 332  0 break;
 333  2288 case DelegatingCacheLoader.delegateGet:
 334  2288 fqn = (Fqn) input.readObject();
 335  2288 n = (NodeSPI) c.getRoot().getChild(fqn);
 336  2288 if (n == null)
 337    {// node doesn't exist - return null
 338  820 output.writeObject(n);
 339  820 break;
 340    }
 341  1468 Map map = n.getDataDirect();
 342  0 if (map == null) map = new HashMap();
 343  1468 output.writeObject(map);
 344  1468 break;
 345  71 case DelegatingCacheLoader.delegateExists:
 346  71 fqn = (Fqn) input.readObject();
 347  71 flag = c.getRoot().hasChild(fqn);
 348  71 output.writeObject(flag);
 349  71 break;
 350  2135 case DelegatingCacheLoader.delegatePutKeyVal:
 351  2135 fqn = (Fqn) input.readObject();
 352  2135 key = input.readObject();
 353  2135 val = input.readObject();
 354  2135 retval = c.put(fqn, key, val);
 355  2135 output.writeObject(retval);
 356  2135 break;
 357  2030 case DelegatingCacheLoader.delegatePut:
 358  2030 fqn = (Fqn) input.readObject();
 359  2030 map = (Map) input.readObject();
 360  2030 c.put(fqn, map);
 361  2030 output.writeObject(Boolean.TRUE);
 362  2030 break;
 363   
 364  61 case DelegatingCacheLoader.putList:
 365  61 int length = input.readInt();
 366  61 retval = Boolean.TRUE;
 367  61 if (length > 0)
 368    {
 369  61 Modification mod;
 370  61 List<Modification> mods = new ArrayList<Modification>(length);
 371  61 for (int i = 0; i < length; i++)
 372    {
 373  70 mod = new Modification();
 374  70 mod.readExternal(input);
 375  70 mods.add(mod);
 376    }
 377  61 try
 378    {
 379  61 handleModifications(mods);
 380    }
 381    catch (Exception ex)
 382    {
 383  0 retval = ex;
 384    }
 385    }
 386  61 output.writeObject(retval);
 387  61 break;
 388  2024 case DelegatingCacheLoader.delegateRemoveKey:
 389  2024 fqn = (Fqn) input.readObject();
 390  2024 key = input.readObject();
 391  2024 retval = c.remove(fqn, key);
 392  2024 output.writeObject(retval);
 393  2024 break;
 394  2147 case DelegatingCacheLoader.delegateRemove:
 395  2147 fqn = (Fqn) input.readObject();
 396  2147 c.removeNode(fqn);
 397  2147 output.writeObject(Boolean.TRUE);
 398  2147 break;
 399  3 case DelegatingCacheLoader.delegateRemoveData:
 400  3 fqn = (Fqn) input.readObject();
 401  3 node = c.getRoot().getChild(fqn);
 402  3 if (node != null)
 403    {
 404  3 node.clearData();
 405  3 output.writeObject(true);
 406    }
 407    else
 408    {
 409  0 output.writeObject(false);
 410    }
 411   
 412  3 break;
 413  0 case DelegatingCacheLoader.delegateLoadEntireState:
 414  0 ObjectOutputStream os = (ObjectOutputStream) input.readObject();
 415   
 416  0 if (c.getCacheLoaderManager() != null)
 417    {
 418  0 c.getCacheLoaderManager().getCacheLoader().loadEntireState(os);
 419    }
 420  0 output.writeObject(Boolean.TRUE);
 421  0 break;
 422  0 case DelegatingCacheLoader.delegateStoreEntireState:
 423  0 ObjectInputStream is = (ObjectInputStream) input.readObject();
 424  0 if (c.getCacheLoaderManager() != null)
 425    {
 426  0 c.getCacheLoaderManager().getCacheLoader().storeEntireState(is);
 427    }
 428  0 output.writeObject(Boolean.TRUE);
 429  0 break;
 430  0 default:
 431  0 log.error("Operation " + op + " unknown");
 432  0 break;
 433    }
 434  12803 output.flush();
 435    }
 436    catch (Exception e)
 437    {
 438  0 log.debug(e, e);
 439  0 try
 440    {
 441  0 output.writeObject(e);
 442  0 output.flush();
 443    }
 444    catch (IOException e1)
 445    {
 446  0 log.error(e1, e1);
 447    }
 448    }
 449    }
 450    }
 451   
 452   
 453  0 public String toString()
 454    {
 455  0 StringBuffer sb = new StringBuffer();
 456  0 if (sock != null)
 457    {
 458  0 sb.append(sock.getRemoteSocketAddress());
 459    }
 460  0 return sb.toString();
 461    }
 462   
 463  61 protected void handleModifications(List<Modification> modifications) throws CacheException
 464    {
 465   
 466  61 for (Modification m : modifications)
 467    {
 468  70 switch (m.getType())
 469    {
 470  53 case PUT_DATA:
 471  53 c.put(m.getFqn(), m.getData());
 472  53 break;
 473  0 case PUT_DATA_ERASE:
 474  0 c.put(m.getFqn(), m.getData());
 475  0 break;
 476  10 case PUT_KEY_VALUE:
 477  10 c.put(m.getFqn(), m.getKey(), m.getValue());
 478  10 break;
 479  2 case REMOVE_DATA:
 480  2 Node n = c.getRoot().getChild(m.getFqn());
 481  2 if (n != null) n.clearData();
 482  2 break;
 483  2 case REMOVE_KEY_VALUE:
 484  2 c.remove(m.getFqn(), m.getKey());
 485  2 break;
 486  3 case REMOVE_NODE:
 487  3 c.removeNode(m.getFqn());
 488  3 break;
 489  0 case MOVE:
 490  0 c.move(m.getFqn(), m.getFqn2());
 491  0 break;
 492  0 default:
 493  0 log.error("modification type " + m.getType() + " not known");
 494  0 break;
 495    }
 496    }
 497    }
 498   
 499   
 500    }
 501   
 502   
 503  0 public static void main(String[] args) throws Exception
 504    {
 505  0 String bind_addr = null;
 506  0 int port = 7500;
 507  0 TcpCacheServer server;
 508  0 String config = null;
 509   
 510  0 for (int i = 0; i < args.length; i++)
 511    {
 512  0 if (args[i].equals("-bind_addr"))
 513    {
 514  0 bind_addr = args[++i];
 515  0 continue;
 516    }
 517  0 if (args[i].equals("-port"))
 518    {
 519  0 port = Integer.parseInt(args[++i]);
 520  0 continue;
 521    }
 522  0 if (args[i].equals("-config"))
 523    {
 524  0 config = args[++i];
 525  0 continue;
 526    }
 527  0 help();
 528  0 return;
 529    }
 530  0 server = new TcpCacheServer();
 531  0 server.daemon = false;
 532  0 server.setBindAddress(bind_addr);
 533  0 server.setPort(port);
 534  0 server.setConfig(config);
 535  0 server.create();
 536  0 server.start();
 537    }
 538   
 539   
 540  0 private static void help()
 541    {
 542  0 System.out.println("TcpCacheServer [-bind_addr <address>] [-port <port>] [-config <config file>] [-help]");
 543    }
 544    }