Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 289   Methods: 23
NCLOC: 228   Classes: 2
 
 Source file Conditionals Statements Methods TOTAL
SingletonStoreCacheLoader.java 42.1% 67.1% 60.9% 59%
coverage coverage
 1    package org.jboss.cache.loader;
 2   
 3    import org.apache.commons.logging.Log;
 4    import org.apache.commons.logging.LogFactory;
 5    import org.jboss.cache.Fqn;
 6    import org.jboss.cache.Modification;
 7    import org.jboss.cache.NodeSPI;
 8    import org.jboss.cache.notifications.annotation.CacheListener;
 9    import org.jboss.cache.notifications.annotation.CacheStarted;
 10    import org.jboss.cache.notifications.annotation.CacheStopped;
 11    import org.jboss.cache.notifications.annotation.ViewChanged;
 12    import org.jboss.cache.notifications.event.Event;
 13    import org.jboss.cache.notifications.event.ViewChangedEvent;
 14    import org.jgroups.Address;
 15    import org.jgroups.View;
 16   
 17    import java.io.ObjectInputStream;
 18    import java.util.Collection;
 19    import java.util.List;
 20    import java.util.Map;
 21    import java.util.Set;
 22    import java.util.Vector;
 23   
 24    /**
 25    * SingletonStoreCacheLoader is a delegating cache loader used for situations when only one node should interact with
 26    * the underlying store. The coordinator of the cluster will be responsible for the underlying CacheLoader.
 27    * SingletonStoreCacheLoader is a simply facade to a real CacheLoader implementation. It always delegates reads to the
 28    * real CacheLoader.
 29    * <p/>
 30    * Writes are forwarded only if this SingletonStoreCacheLoader is currently the cordinator. This avoid having all
 31    * CacheLoaders in a cluster writing the same data to the same underlying store. Although not incorrect (e.g. a DB
 32    * will just discard additional INSERTs for the same key, and throw an exception), this will avoid a lot of
 33    * redundant work.<br/>
 34    * <p/>
 35    * Whenever the current coordinator dies (or leaves), the second in line will take over. That SingletonStoreCacheLoader
 36    * will then pass writes through to its underlying CacheLoader. Optionally, when a new coordinator takes over the
 37    * Singleton, it can push the in-memory state to the cache cacheLoader.
 38    *
 39    * @author Bela Ban
 40    * @author <a href="mailto:galder.zamarreno@jboss.com">Galder Zamarreno</a>
 41    */
 42    public class SingletonStoreCacheLoader extends AbstractDelegatingCacheLoader
 43    {
 44    private static final Log log = LogFactory.getLog(SingletonStoreCacheLoader.class);
 45    private Address localAddress;
 46    private boolean active;// only active if coordinator
 47    private boolean pushStateWhenCoordinator;
 48    private Thread pushStateThread;
 49    private Object cacheListener;
 50   
 51  11 public SingletonStoreCacheLoader(CacheLoader cacheLoader, boolean pushConfiguration)
 52    {
 53  11 super(cacheLoader);
 54  11 pushStateWhenCoordinator = pushConfiguration;
 55  11 cacheListener = new SingletonStoreListener();
 56    }
 57   
 58  10 public Object getCacheListener()
 59    {
 60  10 return cacheListener;
 61    }
 62   
 63  6 protected void activeStatusChanged(boolean newActiveState)
 64    {
 65  6 active = newActiveState;
 66  6 log.debug("changed mode: " + this);
 67  6 if (active && pushStateWhenCoordinator)
 68    {
 69  4 if (pushStateThread == null || !pushStateThread.isAlive())
 70    {
 71  3 pushStateThread = createPushStateThread();
 72  3 pushStateThread.setName("InMemoryToCacheLoaderPusher");
 73  3 pushStateThread.start();
 74    }
 75    else
 76    {
 77  1 try
 78    {
 79  1 log.debug("joining currently running state push thread");
 80  1 pushStateThread.join();
 81    }
 82    catch (InterruptedException e)
 83    {
 84  0 log.error("joining existing push state thread was interrupted", e);
 85    }
 86    }
 87    }
 88    }
 89   
 90  2 protected Thread createPushStateThread()
 91    {
 92  2 Thread t = new Thread(new Runnable()
 93    {
 94  2 public void run()
 95    {
 96  2 log.debug("start pushing in-memory state to cache cacheLoader");
 97  2 try
 98    {
 99  2 pushState(cache.getRoot());
 100  2 log.debug("in-memory state passed to cache cacheLoader successfully");
 101    }
 102    catch (Exception e)
 103    {
 104  0 log.error("unable to finish pushing the state", e);
 105    }
 106    }
 107    });
 108  2 t.setDaemon(true);
 109  2 return t;
 110    }
 111   
 112  18 private boolean isCoordinator(View newView)
 113    {
 114  18 if (newView != null && localAddress != null)
 115    {
 116  12 Vector mbrs = newView.getMembers();
 117  12 if (mbrs != null)
 118    {
 119  12 if (mbrs.size() > 0 && localAddress.equals(mbrs.firstElement()))
 120    {
 121    /* This node is the coordinator */
 122  8 return true;
 123    }
 124    }
 125   
 126  4 return false;
 127    }
 128   
 129    /* Invalid new view, so previous value returned */
 130  6 return active;
 131    }
 132   
 133  18 private void pushState(NodeSPI node) throws Exception
 134    {
 135    /* Put the node's data first */
 136  18 Set keys = node.getKeysDirect();
 137  18 Fqn fqn = node.getFqn();
 138   
 139  18 for (Object aKey : keys)
 140    {
 141  18 Object value = cache.get(fqn, aKey);
 142  18 put(fqn, aKey, value);
 143    }
 144   
 145    /* Navigates to the children */
 146  18 Collection<NodeSPI> children = node.getChildrenDirect();
 147  18 for (NodeSPI aChildren : children)
 148    {
 149    //Map.Entry entry = (Map.Entry) aChildren;
 150  16 pushState(aChildren);
 151    }
 152    }
 153   
 154  61 public Object put(Fqn name, Object key, Object value) throws Exception
 155    {
 156  61 if (active)
 157    {
 158  35 return super.put(name, key, value);
 159    }
 160   
 161  26 return null;
 162    }
 163   
 164  0 public void put(Fqn name, Map attributes) throws Exception
 165    {
 166  0 if (active)
 167    {
 168  0 super.put(name, attributes);
 169    }
 170    }
 171   
 172  0 public void put(List<Modification> modifications) throws Exception
 173    {
 174  0 if (active)
 175    {
 176  0 super.put(modifications);
 177    }
 178    }
 179   
 180  0 public Object remove(Fqn fqn, Object key) throws Exception
 181    {
 182  0 if (active)
 183    {
 184  0 return super.remove(fqn, key);
 185    }
 186   
 187  0 return null;
 188    }
 189   
 190  0 public void remove(Fqn fqn) throws Exception
 191    {
 192  0 if (active)
 193    {
 194  0 super.remove(fqn);
 195    }
 196    }
 197   
 198  0 public void removeData(Fqn fqn) throws Exception
 199    {
 200  0 if (active)
 201    {
 202  0 super.removeData(fqn);
 203    }
 204    }
 205   
 206  0 public void prepare(Object tx, List<Modification> modifications, boolean one_phase) throws Exception
 207    {
 208  0 if (active)
 209    {
 210  0 super.prepare(tx, modifications, one_phase);
 211    }
 212    }
 213   
 214  0 public void commit(Object tx) throws Exception
 215    {
 216  0 if (active)
 217    {
 218  0 super.commit(tx);
 219    }
 220    }
 221   
 222  0 public void rollback(Object tx)
 223    {
 224  0 if (active)
 225    {
 226  0 super.rollback(tx);
 227    }
 228    }
 229   
 230  4 public void storeEntireState(ObjectInputStream is) throws Exception
 231    {
 232  4 if (active)
 233    {
 234  0 super.storeEntireState(is);
 235    }
 236    }
 237   
 238  0 public void storeState(Fqn subtree, ObjectInputStream is) throws Exception
 239    {
 240  0 if (active)
 241    {
 242  0 super.storeState(subtree, is);
 243    }
 244    }
 245   
 246  4 public Thread getPushStateThread()
 247    {
 248  4 return pushStateThread;
 249    }
 250   
 251  6 public String toString()
 252    {
 253  6 return "loc_addr=" + localAddress + ", active=" + active;
 254    }
 255   
 256    /**
 257    * Cache listener that reacts to cluster topology changes to find out whether a new coordinator is elected.
 258    * SingletonStoreCacheLoader reacts to these changes in order to decide which node should interact with the
 259    * underlying cache store.
 260    */
 261    @CacheListener
 262    public class SingletonStoreListener
 263    {
 264  6 @CacheStarted
 265    public void cacheStarted(Event e)
 266    {
 267  6 localAddress = cache.getLocalAddress();
 268  6 active = cache.getRPCManager().isCoordinator();
 269  0 if (log.isDebugEnabled()) log.debug("cache started: " + this);
 270    }
 271   
 272  6 @CacheStopped
 273    public void cacheStopped(Event e)
 274    {
 275  0 if (log.isDebugEnabled()) log.debug("cache stopped: " + this);
 276    }
 277   
 278  18 @ViewChanged
 279    public void viewChange(ViewChangedEvent event)
 280    {
 281  18 boolean tmp = isCoordinator(event.getNewView());
 282   
 283  18 if (active != tmp)
 284    {
 285  4 activeStatusChanged(tmp);
 286    }
 287    }
 288    }
 289    }