JBossCacheAop Memory leak ?
ctof Sep 30, 2005 9:40 AMHi,
Before using JBossCache in a production environnement, i try to test performance and memory stability
You can find below an example where a publisher send asap a number of the same object and a subscriber which read asp the data (with cache notification)
* First the CPU is always full (all is right, the publisher send data in a while(true) loop) ... but maybe it means the marshaling/unmarshaling have a certainly impact on the CP occupation.
* Second, the memory is very stable for the sender but for receiver, the memory always grow up ... and a force gc doesn't change nothing.
* the number of created thread (not live) is really incredibale > 2000 after sending 400000 msg
I have test with different Jgroup config (from JGroups conf example) without succes.
I run this sample under windows XP(pack 2) whith JVM 1.5.0_03 with a Xmx128m.
I monitor the JVM with JConsole.
I start both processes (publisher and subscriber) on the same machine (Dual xeon)
When the max memory is riched (arround 128m), i have no out of memory exception but the publisher and subscriber are suspended (they do nothing).
Any help is welcome, this memory pb is a bottleneck to use JBossCache into my company.
Christophe
Listing 1: SubscriberTest.java
import javax.swing.JLabel;
import javax.swing.JPanel;
import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.PropertyConfigurator;
import org.jboss.cache.aop.TreeCacheAop;
import org.jdesktop.swingx.JXFrame;
import deai.ft.archi.poc.cache.CacheDataListener;
import deai.ft.archi.poc.cache.TreeCacheListenerAdapter;
public class SubscriberTest implements CacheDataListener
{
static int cpt = 0;
TreeCacheAop cache;
long startTime = 0;
static Fqn fqn = Fqn.fromString("/TEST/Instrument");
public int msgReceived = 0;
static JLabel info = new JLabel("Read ");
static JLabel cptInfo= new JLabel("");
TreeCacheListenerAdapter cacheAdapter;
public SubscriberTest() throws Exception
{
super();
cache = new TreeCacheAop();
PropertyConfigurator config = new PropertyConfigurator();
config.configure(cache, PublisherTest.CACHE_CFG);
cache.startService();
cacheAdapter = new TreeCacheListenerAdapter(cache);
cacheAdapter.addDataListener(this);
}
public static void main(String[] args)
{
try
{
new SubscriberTest();
JXFrame frame = new JXFrame("Test Read", true);
JPanel pane = new JPanel();
pane.add(info);
pane.add(cptInfo);
frame.setContentPane(pane);
frame.pack();
frame.setLocation(0, 100);
frame.setVisible(true);
}
catch (Exception e) {
e.printStackTrace();
}
}
public void dataModified(Fqn fqn, Object data) {
if (startTime == 0)
startTime = System.currentTimeMillis();
try
{
PublisherTest.instrument instr = (PublisherTest.instrument)cache.getObject(fqn);
System.out.println("receive: "+instr.Id);
if ((msgReceived % PublisherTest.NB_MSG_MOD) == 0)
cptInfo.setText(""+msgReceived);
msgReceived++;
}
catch (CacheException e)
{
e.printStackTrace();
}
if (msgReceived == PublisherTest.NB_MSG_TO_SEND)
{
long endTime = System.currentTimeMillis();
double totalTime = (endTime - startTime);
double avg = PublisherTest.NB_MSG_TO_SEND / totalTime * 1000;
System.out.println("Total Read "+msgReceived+" msg, Time: "+totalTime+ " ms, avg msg/s read: "+avg);
cptInfo.setText(""+msgReceived);
}
}
public void dataRemoved(Fqn fqn, Object data) {}
public void dataEvicted(Fqn fqn, Object data) {}
}
File: CacheDataListener.java
import org.jboss.cache.Fqn;
public interface CacheDataListener
{
public void dataModified(Fqn fqn, Object data);
public void dataRemoved(Fqn fqn, Object data);
public void dataEvicted(Fqn fqn, Object data);
}
File: TreeCacheListenerAdapter.java
import java.util.concurrent.ConcurrentLinkedQueue;
import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
import org.jboss.cache.TreeCacheListener;
import org.jboss.cache.aop.AOPInstance;
import org.jboss.cache.aop.TreeCacheAop;
import org.jgroups.View;
public class TreeCacheListenerAdapter
implements TreeCacheListener
{
protected ConcurrentLinkedQueue<CacheDataListener> listenersData = new ConcurrentLinkedQueue<CacheDataListener>();
TreeCacheAop cache;
/**
*
*/
public TreeCacheListenerAdapter(TreeCacheAop cache)
{
super();
this.cache = cache;
cache.addTreeCacheListener(this);
}
public void addDataListener(CacheDataListener listener)
{
if (!listenersData.contains(listener))
listenersData.add(listener);
}
public void removeDataListener(CacheDataListener listener)
{
listenersData.remove(listener);
}
public void nodeCreated(Fqn fqn) { }
public void nodeRemoved(Fqn fqn) {
notifyDataRemoved(fqn, null);
}
public void nodeLoaded(Fqn fqn) { }
public void nodeEvicted(Fqn fqn)
{ notifyDataEvicted(fqn, null); }
public void nodeModified(Fqn fqn)
{
try
{
notifyDataModified(fqn, cache.getObject(fqn));
}
catch (CacheException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void nodeVisited(Fqn fqn) { }
public void cacheStarted(TreeCache cache) { }
public void cacheStopped(TreeCache cache) { }
public void viewChange(View new_view) { }
public void notifyDataModified(Fqn fqn, Object data) {
if (data != null)
if ((data instanceof Class) || (data instanceof AOPInstance) || (data instanceof String && data.equals(TreeCacheAop.DUMMY)))
//&& ((AOPInstance) data).get() == null))
return;
else
for(CacheDataListener listener : listenersData)
listener.dataModified(fqn, data);
}
public void notifyDataRemoved(Fqn fqn, Object data)
{
for(CacheDataListener listener : listenersData)
listener.dataRemoved(fqn, data);
}
public void notifyDataEvicted(Fqn fqn, Object data)
{
for(CacheDataListener listener : listenersData)
listener.dataEvicted(fqn, data);
}
}
Listing 2: PublisherTest.java
import java.io.Serializable;
import javax.swing.JLabel;
import javax.swing.JPanel;
import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.PropertyConfigurator;
import org.jboss.cache.aop.TreeCacheAop;
import org.jdesktop.swingx.JXFrame;
public class PublisherTest
{
static int cpt = 0;
public static long NB_MSG_TO_SEND = 100000;
public static long NB_MSG_MOD = 100;
/**
*
*/
static JLabel info = new JLabel("Send ");
static JLabel cptInfo= new JLabel("");
static String CACHE_CFG = fast-service-nothread-test.xml";
public static class instrument implements Serializable {
long Id;
String label;
double price;
String desc;
int foo;
float bar;
public instrument(long id, float bar, String desc, int foo, String label, double price)
{
super();
// TODO Auto-generated constructor stub
this.bar = bar;
this.desc = desc;
this.foo = foo;
Id = id;
this.label = label;
this.price = price;
}
}
private static void benchSendMsg(TreeCacheAop cache) throws CacheException
{
System.out.println("Start Send Bench");
Fqn fqn = Fqn.fromString("/TEST/Instrument");
long startTime = System.currentTimeMillis();
for (long i = 0 ; i < NB_MSG_TO_SEND ; i++)
{
cache.putObject(fqn, new instrument(i, (float)(i*0.5), "aaa", (int)i, "aaaa", (double)i*2));
if ((i % NB_MSG_MOD) == 0)
cptInfo.setText(""+i);
if ((i % 1000) == 0)
try
{
Thread.sleep(2000);
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
cptInfo.setText(""+NB_MSG_TO_SEND);
long endTime = System.currentTimeMillis();
double totalTime = (endTime - startTime);
double avg = NB_MSG_TO_SEND / totalTime * 1000;
System.out.println("Total Send "+NB_MSG_TO_SEND+"ms Time: "+totalTime+ " avg msg/s send: "+avg);
}
/**
* @param args
*/
public static void main(String[] args)
{
TreeCacheAop dataCache;
JXFrame frame = new JXFrame("Test Send", true);
JPanel pane = new JPanel();
pane.add(info);
pane.add(cptInfo);
frame.setContentPane(pane);
frame.pack();
frame.setVisible(true);
try
{
dataCache = new TreeCacheAop();
PropertyConfigurator config = new PropertyConfigurator();
config.configure(dataCache, CACHE_CFG);
dataCache.startService();
benchSendMsg(dataCache);
}
catch (Exception e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Config-file