Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 197   Methods: 11
NCLOC: 115   Classes: 2
 
 Source file Conditionals Statements Methods TOTAL
ReplicationQueue.java 0% 0% 0% 0%
coverage
 1    /*
 2    * JBoss, the OpenSource J2EE webOS
 3    *
 4    * Distributable under LGPL license.
 5    * See terms of license at gnu.org.
 6    */
 7    package org.jboss.cache;
 8   
 9   
 10    import org.apache.commons.logging.Log;
 11    import org.apache.commons.logging.LogFactory;
 12    import org.jboss.cache.marshall.MethodCall;
 13    import org.jboss.cache.marshall.MethodDeclarations;
 14   
 15    import java.util.ArrayList;
 16    import java.util.LinkedList;
 17    import java.util.List;
 18    import java.util.Timer;
 19    import java.util.TimerTask;
 20   
 21   
 22    /**
 23    * Periodically (or when certain size is exceeded) takes elements and replicates them.
 24    *
 25    * @author <a href="mailto:bela@jboss.org">Bela Ban</a> May 24, 2003
 26    * @version $Revision: 1.15 $
 27    */
 28    public class ReplicationQueue
 29    {
 30   
 31    private static Log log = LogFactory.getLog(ReplicationQueue.class);
 32   
 33    private CacheImpl cache = null;
 34   
 35    /**
 36    * We flush every 5 seconds. Inactive if -1 or 0
 37    */
 38    private long interval = 5000;
 39   
 40    /**
 41    * Max elements before we flush
 42    */
 43    private long max_elements = 500;
 44   
 45    /**
 46    * Holds the replication jobs: LinkedList<MethodCall>
 47    */
 48    private final List<MethodCall> elements = new LinkedList<MethodCall>();
 49   
 50    /**
 51    * For periodical replication
 52    */
 53    private Timer timer = null;
 54   
 55    /**
 56    * The timer task, only calls flush() when executed by Timer
 57    */
 58    private MyTask task = null;
 59   
 60  0 public ReplicationQueue()
 61    {
 62    }
 63   
 64    /**
 65    * Constructs a new ReplicationQueue.
 66    */
 67  0 public ReplicationQueue(CacheImpl cache, long interval, long max_elements)
 68    {
 69  0 this.cache = cache;
 70  0 this.interval = interval;
 71  0 this.max_elements = max_elements;
 72    }
 73   
 74    /**
 75    * Returns the flush interval in milliseconds.
 76    */
 77  0 public long getInterval()
 78    {
 79  0 return interval;
 80    }
 81   
 82    /**
 83    * Sets the flush interval in milliseconds.
 84    */
 85  0 public void setInterval(long interval)
 86    {
 87  0 this.interval = interval;
 88  0 stop();
 89  0 start();
 90    }
 91   
 92    /**
 93    * Returns the maximum number of elements to hold.
 94    * If the maximum number is reached, flushes in the calling thread.
 95    */
 96  0 public long getMax_elements()
 97    {
 98  0 return max_elements;
 99    }
 100   
 101    /**
 102    * Sets the maximum number of elements to hold.
 103    */
 104  0 public void setMax_elements(long max_elements)
 105    {
 106  0 this.max_elements = max_elements;
 107    }
 108   
 109    /**
 110    * Starts the asynchronous flush queue.
 111    */
 112  0 public synchronized void start()
 113    {
 114  0 if (interval > 0)
 115    {
 116  0 if (task == null)
 117  0 task = new MyTask();
 118  0 if (timer == null)
 119    {
 120  0 timer = new Timer(true);
 121  0 timer.schedule(task,
 122    500, // delay before initial flush
 123    interval); // interval between flushes
 124    }
 125    }
 126    }
 127   
 128    /**
 129    * Stops the asynchronous flush queue.
 130    */
 131  0 public synchronized void stop()
 132    {
 133  0 if (task != null)
 134    {
 135  0 task.cancel();
 136  0 task = null;
 137    }
 138  0 if (timer != null)
 139    {
 140  0 timer.cancel();
 141  0 timer = null;
 142    }
 143    }
 144   
 145   
 146    /**
 147    * Adds a new method call.
 148    */
 149  0 public void add(MethodCall job)
 150    {
 151  0 if (job == null)
 152  0 throw new NullPointerException("job is null");
 153  0 synchronized (elements)
 154    {
 155  0 elements.add(job);
 156  0 if (elements.size() >= max_elements)
 157  0 flush();
 158    }
 159    }
 160   
 161    /**
 162    * Flushes existing method calls.
 163    */
 164  0 public void flush()
 165    {
 166  0 List<MethodCall> l;
 167  0 synchronized (elements)
 168    {
 169  0 if (log.isTraceEnabled())
 170  0 log.trace("flush(): flushing repl queue (num elements=" + elements.size() + ")");
 171  0 l = new ArrayList<MethodCall>(elements);
 172  0 elements.clear();
 173    }
 174   
 175  0 if (l.size() > 0)
 176    {
 177  0 try
 178    {
 179    // send to all live nodes in the cluster
 180  0 cache.getRPCManager().callRemoteMethods(null, MethodDeclarations.replicateAllMethod, new Object[]{l}, false, true, 5000);
 181    }
 182    catch (Throwable t)
 183    {
 184  0 log.error("failed replicating " + l.size() + " elements in replication queue", t);
 185    }
 186    }
 187    }
 188   
 189    class MyTask extends TimerTask
 190    {
 191  0 public void run()
 192    {
 193  0 flush();
 194    }
 195    }
 196   
 197    }