JBossCacheAop Memory leak ?
ctof Sep 30, 2005 9:40 AMHi,
Before using JBossCache in a production environnement, i try to test performance and memory stability
You can find below an example where a publisher send asap a number of the same object and a subscriber which read asp the data (with cache notification)
* First the CPU is always full (all is right, the publisher send data in a while(true) loop) ... but maybe it means the marshaling/unmarshaling have a certainly impact on the CP occupation.
* Second, the memory is very stable for the sender but for receiver, the memory always grow up ... and a force gc doesn't change nothing.
* the number of created thread (not live) is really incredibale > 2000 after sending 400000 msg
I have test with different Jgroup config (from JGroups conf example) without succes.
I run this sample under windows XP(pack 2) whith JVM 1.5.0_03 with a Xmx128m.
I monitor the JVM with JConsole.
I start both processes (publisher and subscriber) on the same machine (Dual xeon)
When the max memory is riched (arround 128m), i have no out of memory exception but the publisher and subscriber are suspended (they do nothing).
Any help is welcome, this memory pb is a bottleneck to use JBossCache into my company.
Christophe
Listing 1: SubscriberTest.java
import javax.swing.JLabel; import javax.swing.JPanel; import org.jboss.cache.CacheException; import org.jboss.cache.Fqn; import org.jboss.cache.PropertyConfigurator; import org.jboss.cache.aop.TreeCacheAop; import org.jdesktop.swingx.JXFrame; import deai.ft.archi.poc.cache.CacheDataListener; import deai.ft.archi.poc.cache.TreeCacheListenerAdapter; public class SubscriberTest implements CacheDataListener { static int cpt = 0; TreeCacheAop cache; long startTime = 0; static Fqn fqn = Fqn.fromString("/TEST/Instrument"); public int msgReceived = 0; static JLabel info = new JLabel("Read "); static JLabel cptInfo= new JLabel(""); TreeCacheListenerAdapter cacheAdapter; public SubscriberTest() throws Exception { super(); cache = new TreeCacheAop(); PropertyConfigurator config = new PropertyConfigurator(); config.configure(cache, PublisherTest.CACHE_CFG); cache.startService(); cacheAdapter = new TreeCacheListenerAdapter(cache); cacheAdapter.addDataListener(this); } public static void main(String[] args) { try { new SubscriberTest(); JXFrame frame = new JXFrame("Test Read", true); JPanel pane = new JPanel(); pane.add(info); pane.add(cptInfo); frame.setContentPane(pane); frame.pack(); frame.setLocation(0, 100); frame.setVisible(true); } catch (Exception e) { e.printStackTrace(); } } public void dataModified(Fqn fqn, Object data) { if (startTime == 0) startTime = System.currentTimeMillis(); try { PublisherTest.instrument instr = (PublisherTest.instrument)cache.getObject(fqn); System.out.println("receive: "+instr.Id); if ((msgReceived % PublisherTest.NB_MSG_MOD) == 0) cptInfo.setText(""+msgReceived); msgReceived++; } catch (CacheException e) { e.printStackTrace(); } if (msgReceived == PublisherTest.NB_MSG_TO_SEND) { long endTime = System.currentTimeMillis(); double totalTime = (endTime - startTime); double avg = PublisherTest.NB_MSG_TO_SEND / totalTime * 1000; System.out.println("Total Read "+msgReceived+" msg, Time: "+totalTime+ " ms, avg msg/s read: "+avg); cptInfo.setText(""+msgReceived); } } public void dataRemoved(Fqn fqn, Object data) {} public void dataEvicted(Fqn fqn, Object data) {} } File: CacheDataListener.java import org.jboss.cache.Fqn; public interface CacheDataListener { public void dataModified(Fqn fqn, Object data); public void dataRemoved(Fqn fqn, Object data); public void dataEvicted(Fqn fqn, Object data); } File: TreeCacheListenerAdapter.java import java.util.concurrent.ConcurrentLinkedQueue; import org.jboss.cache.CacheException; import org.jboss.cache.Fqn; import org.jboss.cache.TreeCache; import org.jboss.cache.TreeCacheListener; import org.jboss.cache.aop.AOPInstance; import org.jboss.cache.aop.TreeCacheAop; import org.jgroups.View; public class TreeCacheListenerAdapter implements TreeCacheListener { protected ConcurrentLinkedQueue<CacheDataListener> listenersData = new ConcurrentLinkedQueue<CacheDataListener>(); TreeCacheAop cache; /** * */ public TreeCacheListenerAdapter(TreeCacheAop cache) { super(); this.cache = cache; cache.addTreeCacheListener(this); } public void addDataListener(CacheDataListener listener) { if (!listenersData.contains(listener)) listenersData.add(listener); } public void removeDataListener(CacheDataListener listener) { listenersData.remove(listener); } public void nodeCreated(Fqn fqn) { } public void nodeRemoved(Fqn fqn) { notifyDataRemoved(fqn, null); } public void nodeLoaded(Fqn fqn) { } public void nodeEvicted(Fqn fqn) { notifyDataEvicted(fqn, null); } public void nodeModified(Fqn fqn) { try { notifyDataModified(fqn, cache.getObject(fqn)); } catch (CacheException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void nodeVisited(Fqn fqn) { } public void cacheStarted(TreeCache cache) { } public void cacheStopped(TreeCache cache) { } public void viewChange(View new_view) { } public void notifyDataModified(Fqn fqn, Object data) { if (data != null) if ((data instanceof Class) || (data instanceof AOPInstance) || (data instanceof String && data.equals(TreeCacheAop.DUMMY))) //&& ((AOPInstance) data).get() == null)) return; else for(CacheDataListener listener : listenersData) listener.dataModified(fqn, data); } public void notifyDataRemoved(Fqn fqn, Object data) { for(CacheDataListener listener : listenersData) listener.dataRemoved(fqn, data); } public void notifyDataEvicted(Fqn fqn, Object data) { for(CacheDataListener listener : listenersData) listener.dataEvicted(fqn, data); } }
Listing 2: PublisherTest.java
import java.io.Serializable; import javax.swing.JLabel; import javax.swing.JPanel; import org.jboss.cache.CacheException; import org.jboss.cache.Fqn; import org.jboss.cache.PropertyConfigurator; import org.jboss.cache.aop.TreeCacheAop; import org.jdesktop.swingx.JXFrame; public class PublisherTest { static int cpt = 0; public static long NB_MSG_TO_SEND = 100000; public static long NB_MSG_MOD = 100; /** * */ static JLabel info = new JLabel("Send "); static JLabel cptInfo= new JLabel(""); static String CACHE_CFG = fast-service-nothread-test.xml"; public static class instrument implements Serializable { long Id; String label; double price; String desc; int foo; float bar; public instrument(long id, float bar, String desc, int foo, String label, double price) { super(); // TODO Auto-generated constructor stub this.bar = bar; this.desc = desc; this.foo = foo; Id = id; this.label = label; this.price = price; } } private static void benchSendMsg(TreeCacheAop cache) throws CacheException { System.out.println("Start Send Bench"); Fqn fqn = Fqn.fromString("/TEST/Instrument"); long startTime = System.currentTimeMillis(); for (long i = 0 ; i < NB_MSG_TO_SEND ; i++) { cache.putObject(fqn, new instrument(i, (float)(i*0.5), "aaa", (int)i, "aaaa", (double)i*2)); if ((i % NB_MSG_MOD) == 0) cptInfo.setText(""+i); if ((i % 1000) == 0) try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } cptInfo.setText(""+NB_MSG_TO_SEND); long endTime = System.currentTimeMillis(); double totalTime = (endTime - startTime); double avg = NB_MSG_TO_SEND / totalTime * 1000; System.out.println("Total Send "+NB_MSG_TO_SEND+"ms Time: "+totalTime+ " avg msg/s send: "+avg); } /** * @param args */ public static void main(String[] args) { TreeCacheAop dataCache; JXFrame frame = new JXFrame("Test Send", true); JPanel pane = new JPanel(); pane.add(info); pane.add(cptInfo); frame.setContentPane(pane); frame.pack(); frame.setVisible(true); try { dataCache = new TreeCacheAop(); PropertyConfigurator config = new PropertyConfigurator(); config.configure(dataCache, CACHE_CFG); dataCache.startService(); benchSendMsg(dataCache); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
Config-file