Concurrency problems - cache puts lost
akluge Nov 24, 2009 4:26 PMHi,
In running some concurrency tests, where multiple threads do simultaneous puts
against a cache, some of the puts appear to be lost. I run a set of 5 threads to do
puts, then run a set of five threads to do gets. Many times, all the keys that are put
into the cache are not present in the cache. I see this with 3.1.0, and also the newest
stable version, 3.2.1.
Running the contained code produces messages such as:
24 Nov 2009 14:44:50,135 ERROR [] com.m1.test.local.ConcurrencyTest Missing value for Key1.
The trace shows that the get was done, and that it returned null.
24 Nov 2009 14:44:50,134 TRACE [] org.jboss.cache.interceptors.InvocationContextInterceptor Invoked with command GetKeyValueCommand{fqn=/BigNode, key=Key1, sendNodeEvent=true} and InvocationContext [InvocationContext{transaction=null, globalTransaction=null, transactionContext=null, optionOverrides=Option{failSilently=false, cacheModeLocal=false, dataVersion=null, suppressLocking=false, lockAcquisitionTimeout=-1, forceDataGravitation=false, skipDataGravitation=false, forceAsynchronous=false, forceSynchronous=false, suppressPersistence=false, suppressEventNotification=false}, originLocal=true, bypassUnmarshalling=false}]
24 Nov 2009 14:44:50,134 TRACE [] org.jboss.cache.interceptors.CallInterceptor Executing command: GetKeyValueCommand{fqn=/BigNode, key=Key1, sendNodeEvent=true}.
24 Nov 2009 14:44:50,134 TRACE [] org.jboss.cache.commands.read.GetKeyValueCommand Found value null
However, earlier in the run I have put the Key into the cache.
24 Nov 2009 14:44:40,831 INFO [] com.m1.test.local.ConcurrencyTest Putting Key1
And the trace indicates that a value was put for the key:
24 Nov 2009 14:44:40,829 TRACE [] org.jboss.cache.interceptors.InvocationContextInterceptor Invoked with command PutKeyValueCommand{fqn=/BigNode, dataVersion=null, globalTransaction=null, key=Key1, value=This is a test.} and InvocationContext [InvocationContext{transaction=null, globalTransaction=null, transactionContext=null, optionOverrides=Option{failSilently=false, cacheModeLocal=false, dataVersion=null, suppressLocking=false, lockAcquisitionTimeout=-1, forceDataGravitation=false, skipDataGravitation=false, forceAsynchronous=false, forceSynchronous=false, suppressPersistence=false, suppressEventNotification=false}, originLocal=true, bypassUnmarshalling=false}]
These log records are all for the same run of the test code. Since the get happens after the put, I would expect for the corresponding value to be returned.
I have appended the test, which is a reasonably simple and self contained case.
Is there anything else, perhaps in terms of MVCC options I should try?
Thanks,
Alex
package com.m1.test.local;
import org.jboss.cache.Cache;
import org.jboss.cache.config.Configuration;
import java.util.concurrent.CyclicBarrier;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* This test will be used to put a specific number of enteries into the cache,
* and measure the amount of memory taken by the cache.
*/
public class ConcurrencyTest
{
private static final Fqn<String> FQN = Fqn.fromString("BigNode");
private static final Log LOG = LogFactory.getLog(ConcurrencyTest.class);
private static final int NTHREADS = 5;
private static final String VALUE = "This is a test.";
private final CyclicBarrier barrier = new CyclicBarrier(NTHREADS);
private Thread[] threads = new Thread[NTHREADS];
public ConcurrencyTest()
throws Throwable
{
Cache<Object, Object> cache = createCache();
for (int iteration=0; iteration<10; iteration++)
{
for(int j=0; j<NTHREADS; j++)
{
threads[j] = new WriteThread(cache, barrier, iteration, j);
threads[j].start();
}
for(int j=0; j<NTHREADS; j++)
{
threads[j].join(500);
}
}
for (int iteration=0; iteration<10; iteration++)
{
for(int j=0; j<NTHREADS; j++)
{
threads[j] = new ReadThread(cache, barrier, iteration, j);
threads[j].start();
}
for(int j=0; j<NTHREADS; j++)
{
threads[j].join(500);
}
}
while(true)
{
Thread.sleep(3600000);
}
}
public Cache<Object, Object> createCache()
throws Exception
{
Cache<Object, Object> cache = new DefaultCacheFactory().createCache();
cache.create();
cache.start();
return cache;
}
private static class WriteThread extends Thread
{
private CyclicBarrier barrier;
private Cache<Object, Object> cache;
private int iteration;
private int writer;
public WriteThread(Cache<Object, Object> cache, CyclicBarrier barrier,
int iteration, int writer)
throws Throwable
{
this.cache = cache;
this.barrier = barrier;
this.iteration = iteration;
this.writer = writer;
}
/**
* Run a common set of tests in each thread.
*/
public void run()
{
try
{
barrier.await();
for (int id=1000*iteration + 100*writer; id<1000*iteration + 100*(writer+1); id++)
{
String key = "Key" + id;
cache.put(FQN, key, VALUE);
LOG.info("Putting " + key);
}
}
catch (Exception exception)
{
LOG.error("Write thread failed.", exception);
}
}
}
private static class ReadThread extends Thread
{
private CyclicBarrier barrier;
private Cache<Object, Object> cache;
private int iteration;
private int writer;
public ReadThread(Cache<Object, Object> cache, CyclicBarrier barrier,
int iteration, int writer)
throws Throwable
{
this.cache = cache;
this.barrier = barrier;
this.iteration = iteration;
this.writer = writer;
}
/**
* Run a common set of tests in each thread.
*/
public void run()
{
try
{
barrier.await();
for (int id=1000*iteration + 100*writer; id<1000*iteration + 100*(writer+1); id++)
{
String key = "Key" + id;
Object result = cache.get(FQN, key);
if (result == null)
{
LOG.error("Missing value for " + key + ".");
}
}
}
catch (Exception exception)
{
LOG.error("ReadThread failed.", exception);
}
}
}
public static void main(String[] args)
throws Throwable
{
ConcurrencyTest test = new ConcurrencyTest();
}
}
Some other details:
$ java -version
java version "1.6.0_16"
Java(TM) SE Runtime Environment (build 1.6.0_16-b01)
Java HotSpot(TM) 64-Bit Server VM (build 14.2-b01, mixed mode)
$ uname -a
Linux aklugelnx 2.6.28-16-generic #55-Ubuntu SMP Tue Oct 20 19:48:32 UTC 2009 x86_64 GNU/Linux
The only JVM option I use is -Xmx2g to ensure enough space in the cache.
I would welcome suggestions, and even requests for more details or some additional tests to further pin down why I don see what I expect. I be happy to find it to be a configuration option I missed.
Thanks,
Alex