Clover coverage report -
Coverage timestamp: Thu Jul 5 2007 20:02:32 EDT
file stats: LOC: 380   Methods: 21
NCLOC: 274   Classes: 2
 
 Source file Conditionals Statements Methods TOTAL
AsyncCacheLoader.java 63.6% 78.1% 90.5% 75.8%
coverage coverage
 1    /**
 2    *
 3    */
 4    package org.jboss.cache.loader;
 5   
 6    import org.apache.commons.logging.Log;
 7    import org.apache.commons.logging.LogFactory;
 8    import org.jboss.cache.CacheException;
 9    import org.jboss.cache.Fqn;
 10    import org.jboss.cache.Modification;
 11    import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
 12    import org.jboss.cache.util.MapCopy;
 13   
 14    import java.io.IOException;
 15    import java.util.ArrayList;
 16    import java.util.HashMap;
 17    import java.util.List;
 18    import java.util.Map;
 19    import java.util.concurrent.ArrayBlockingQueue;
 20    import java.util.concurrent.BlockingQueue;
 21    import java.util.concurrent.atomic.AtomicBoolean;
 22    import java.util.concurrent.atomic.AtomicInteger;
 23   
 24    /**
 25    * The AsyncCacheLoader is a delegating cache loader that extends
 26    * AbstractDelegatingCacheLoader overriding methods to that should not
 27    * just delegate the operation to the underlying cache loader.
 28    * <p/>
 29    * Read operations are done synchronously, while write (CRUD - Create, Remove,
 30    * Update, Delete) operations are done asynchronously. There is no provision
 31    * for exception handling at the moment for problems encountered with the
 32    * underlying CacheLoader during a CRUD operation, and the exception is just
 33    * logged.
 34    * <p/>
 35    * When configuring the CacheLoader, use the following attribute:
 36    * <p/>
 37    * <code>
 38    * &lt;attribute name="CacheLoaderAsynchronous"&gt;true&lt;/attribute&gt;
 39    * </code>
 40    * <p/>
 41    * to define whether cache loader operations are to be asynchronous. If not
 42    * specified, a cache loader operation is assumed synchronous.
 43    * <p/>
 44    * <p/>
 45    * The following additional parameters are available:
 46    * <dl>
 47    * <dt>cache.async.batchSize</dt>
 48    * <dd>Number of modifications to commit in one transaction, default is
 49    * 100. The minimum batch size is 1.</dd>
 50    * <dt>cache.async.pollWait</dt>
 51    * <dd>How long to wait before processing an incomplete batch, in
 52    * milliseconds. Default is 100. Set this to 0 to not wait before processing
 53    * available records.</dd>
 54    * <dt>cache.async.returnOld</dt>
 55    * <dd>If <code>true</code>, this loader returns the old values from {@link
 56    * #put} and {@link #remove} methods. Otherwise, these methods always return
 57    * null. Default is true. <code>false</code> improves the performance of these
 58    * operations.</dd>
 59    * <dt>cache.async.queueSize</dt>
 60    * <dd>Maximum number of entries to enqueue for asynchronous processing.
 61    * Lowering this size may help prevent out-of-memory conditions. It also may
 62    * help to prevent less records lost in the case of JVM failure. Default is
 63    * 10,000 operations.</dd>
 64    * <dt>cache.async.put</dt>
 65    * <dd>If set to false, all {@link #put} operations will be processed
 66    * synchronously, and then only the {@link #remove} operations will be
 67    * processed asynchronously. This mode may be useful for processing
 68    * expiration of messages within a separate thread and keeping other
 69    * operations synchronous for reliability.
 70    * </dd>
 71    * </dl>
 72    * For increased performance for many smaller transactions, use higher values
 73    * for <code>cache.async.batchSize</code> and
 74    * <code>cache.async.pollWait</code>. For larger sized records, use a smaller
 75    * value for <code>cache.async.queueSize</code>.
 76    *
 77    * @author Manik Surtani (manik.surtani@jboss.com)
 78    */
 79    public class AsyncCacheLoader extends AbstractDelegatingCacheLoader
 80    {
 81   
 82    private static final Log log = LogFactory.getLog(AsyncCacheLoader.class);
 83   
 84    private static AtomicInteger threadId = new AtomicInteger(0);
 85   
 86    /**
 87    * Default limit on entries to process asynchronously.
 88    */
 89    public static final int DEFAULT_QUEUE_SIZE = 10000;
 90   
 91    private AsyncCacheLoaderConfig config;
 92    private AsyncProcessor processor;
 93    private AtomicBoolean stopped = new AtomicBoolean(true);
 94    private BlockingQueue<Modification> queue = new ArrayBlockingQueue<Modification>(DEFAULT_QUEUE_SIZE);
 95   
 96  0 public AsyncCacheLoader()
 97    {
 98  0 super(null);
 99    }
 100   
 101  21 public AsyncCacheLoader(CacheLoader cacheLoader)
 102    {
 103  21 super(cacheLoader);
 104    }
 105   
 106  21 public void setConfig(IndividualCacheLoaderConfig base)
 107    {
 108  21 if (base instanceof AsyncCacheLoaderConfig)
 109    {
 110  0 config = (AsyncCacheLoaderConfig) base;
 111    }
 112    else
 113    {
 114  21 config = new AsyncCacheLoaderConfig(base);
 115    }
 116   
 117  21 if (config.getQueueSize() > 0)
 118    {
 119  1 queue = new ArrayBlockingQueue<Modification>(config.getQueueSize());
 120    }
 121   
 122  21 super.setConfig(base);
 123    }
 124   
 125  14 public Map get(Fqn name) throws Exception
 126    {
 127  14 try
 128    {
 129  14 return super.get(name);
 130    }
 131    catch (IOException e)
 132    {
 133    // FileCacheLoader sometimes does this apparently
 134  0 log.trace(e);
 135  0 return new HashMap(); // ?
 136    }
 137    }
 138   
 139  58 Object get(Fqn name, Object key) throws Exception
 140    {
 141  58 if (config.getReturnOld())
 142    {
 143  55 try
 144    {
 145  55 Map map = super.get(name);
 146  55 if (map != null)
 147    {
 148  49 return map.get(key);
 149    }
 150    }
 151    catch (IOException e)
 152    {
 153    // FileCacheLoader sometimes does this apparently
 154  0 log.trace(e);
 155    }
 156    }
 157  9 return null;
 158    }
 159   
 160  58 public Object put(Fqn name, Object key, Object value) throws Exception
 161    {
 162  58 if (config.getUseAsyncPut())
 163    {
 164  57 Object oldValue = get(name, key);
 165  57 Modification mod = new Modification(Modification.ModificationType.PUT_KEY_VALUE, name, key, value);
 166  57 enqueue(mod);
 167  57 return oldValue;
 168    }
 169    else
 170    {
 171  1 return super.put(name, key, value);
 172    }
 173    }
 174   
 175  1 public void put(Fqn name, Map attributes) throws Exception
 176    {
 177  1 if (config.getUseAsyncPut())
 178    {
 179    // JBCACHE-769 -- make a defensive copy
 180  0 Map attrs = (attributes == null ? null : new MapCopy(attributes));
 181  0 Modification mod = new Modification(Modification.ModificationType.PUT_DATA, name, attrs);
 182  0 enqueue(mod);
 183    }
 184    else
 185    {
 186  1 super.put(name, attributes); // Let delegate make its own defensive copy
 187    }
 188    }
 189   
 190  1 public void put(List<Modification> modifications) throws Exception
 191    {
 192  1 if (config.getUseAsyncPut())
 193    {
 194  0 for (Modification modification : modifications)
 195    {
 196  0 enqueue(modification);
 197    }
 198    }
 199    else
 200    {
 201  1 super.put(modifications);
 202    }
 203    }
 204   
 205  1 public Object remove(Fqn name, Object key) throws Exception
 206    {
 207  1 Object oldValue = get(name, key);
 208  1 Modification mod = new Modification(Modification.ModificationType.REMOVE_KEY_VALUE, name, key);
 209  1 enqueue(mod);
 210  1 return oldValue;
 211    }
 212   
 213  10 public void remove(Fqn name) throws Exception
 214    {
 215  10 Modification mod = new Modification(Modification.ModificationType.REMOVE_NODE, name);
 216  10 enqueue(mod);
 217    }
 218   
 219  0 public void removeData(Fqn name) throws Exception
 220    {
 221  0 Modification mod = new Modification(Modification.ModificationType.REMOVE_DATA, name);
 222  0 enqueue(mod);
 223    }
 224   
 225  8 public void start() throws Exception
 226    {
 227  8 if (log.isInfoEnabled()) log.info("Async cache loader starting: " + this);
 228  8 stopped.set(false);
 229  8 super.start();
 230  8 processor = new AsyncProcessor();
 231  8 processor.start();
 232    }
 233   
 234  8 public void stop()
 235    {
 236  8 stopped.set(true);
 237  8 if (processor != null)
 238    {
 239  8 processor.stop();
 240    }
 241  8 super.stop();
 242    }
 243   
 244  68 private void enqueue(Modification mod)
 245    throws CacheException, InterruptedException
 246    {
 247  68 if (stopped.get())
 248    {
 249  1 throw new CacheException("AsyncCacheLoader stopped; no longer accepting more entries.");
 250    }
 251  67 if (log.isTraceEnabled())
 252    {
 253  0 log.trace("Enqueuing modification " + mod);
 254    }
 255  67 queue.put(mod);
 256    }
 257   
 258    /**
 259    * Processes (by batch if possible) a queue of {@link Modification}s.
 260    *
 261    * @author manik surtani
 262    */
 263    private class AsyncProcessor implements Runnable
 264    {
 265    private Thread t;
 266   
 267    // Modifications to process as a single put
 268    private final List<Modification> mods = new ArrayList<Modification>(config.getBatchSize());
 269   
 270  8 public void start()
 271    {
 272  8 if (t == null || !t.isAlive())
 273    {
 274  8 t = new Thread(this, "AsyncCacheLoader-" + threadId.getAndIncrement());
 275  8 t.setDaemon(true);
 276  8 t.start();
 277    }
 278    }
 279   
 280  8 public void stop()
 281    {
 282  8 if (t != null)
 283    {
 284  8 t.interrupt();
 285  8 try
 286    {
 287  8 t.join();
 288    }
 289    catch (InterruptedException e)
 290    {
 291    }
 292    }
 293  8 if (!queue.isEmpty())
 294    {
 295  0 log.warn("Async queue not yet empty, possibly interrupted");
 296    }
 297    }
 298   
 299  8 public void run()
 300    {
 301  8 while (!Thread.interrupted())
 302    {
 303  59 try
 304    {
 305  59 run0();
 306    }
 307    catch (InterruptedException e)
 308    {
 309  2 break;
 310    }
 311    }
 312   
 313  8 try
 314    {
 315  0 if (log.isTraceEnabled()) log.trace("process remaining batch " + mods.size());
 316  8 put(mods);
 317  0 if (log.isTraceEnabled()) log.trace("process remaining queued " + queue.size());
 318  8 while (!queue.isEmpty())
 319    {
 320  4 run0();
 321    }
 322    }
 323    catch (InterruptedException e)
 324    {
 325  0 log.trace("remaining interrupted");
 326    }
 327    }
 328   
 329  63 private void run0() throws InterruptedException
 330    {
 331  63 log.trace("Checking for modifications");
 332  63 int i = queue.drainTo(mods, config.getBatchSize());
 333  63 if (i == 0)
 334    {
 335  46 Modification m = queue.take();
 336  44 mods.add(m);
 337    }
 338   
 339  61 if (log.isTraceEnabled())
 340    {
 341  0 log.trace("Calling put(List) with " + mods.size() + " modifications");
 342    }
 343  61 put(mods);
 344  61 mods.clear();
 345    }
 346   
 347  69 private void put(List<Modification> mods)
 348    {
 349  69 try
 350    {
 351  69 AsyncCacheLoader.super.put(mods);
 352    }
 353    catch (Exception e)
 354    {
 355  0 if (log.isWarnEnabled()) log.warn("Failed to process async modifications: " + e);
 356  0 log.debug("Exception: ", e);
 357    }
 358    }
 359   
 360  2 public String toString()
 361    {
 362  2 return "TQ t=" + t;
 363    }
 364   
 365    }
 366   
 367  9 public String toString()
 368    {
 369  9 return super.toString() +
 370    " delegate=[" + super.getCacheLoader() + "]" +
 371    " processor=" + processor +
 372    " stopped=" + stopped +
 373    " batchSize=" + config.getBatchSize() +
 374    " returnOld=" + config.getReturnOld() +
 375    " asyncPut=" + config.getUseAsyncPut() +
 376    " queue.remainingCapacity()=" + queue.remainingCapacity() +
 377    " queue.peek()=" + queue.peek();
 378    }
 379   
 380    }