4 Replies Latest reply on Jan 26, 2006 2:11 PM by John Lin

    JBossCacheAop Memory leak ?

    Christophe Lallement Newbie

      Hi,
      Before using JBossCache in a production environnement, i try to test performance and memory stability

      You can find below an example where a publisher send asap a number of the same object and a subscriber which read asp the data (with cache notification)

      * First the CPU is always full (all is right, the publisher send data in a while(true) loop) ... but maybe it means the marshaling/unmarshaling have a certainly impact on the CP occupation.

      * Second, the memory is very stable for the sender but for receiver, the memory always grow up ... and a force gc doesn't change nothing.


      * the number of created thread (not live) is really incredibale > 2000 after sending 400000 msg

      I have test with different Jgroup config (from JGroups conf example) without succes.

      I run this sample under windows XP(pack 2) whith JVM 1.5.0_03 with a Xmx128m.
      I monitor the JVM with JConsole.
      I start both processes (publisher and subscriber) on the same machine (Dual xeon)
      When the max memory is riched (arround 128m), i have no out of memory exception but the publisher and subscriber are suspended (they do nothing).

      Any help is welcome, this memory pb is a bottleneck to use JBossCache into my company.

      Christophe


      Listing 1: SubscriberTest.java

      import javax.swing.JLabel;
      import javax.swing.JPanel;
      
      import org.jboss.cache.CacheException;
      import org.jboss.cache.Fqn;
      import org.jboss.cache.PropertyConfigurator;
      import org.jboss.cache.aop.TreeCacheAop;
      import org.jdesktop.swingx.JXFrame;
      
      import deai.ft.archi.poc.cache.CacheDataListener;
      import deai.ft.archi.poc.cache.TreeCacheListenerAdapter;
      
      public class SubscriberTest implements CacheDataListener
      {
       static int cpt = 0;
       TreeCacheAop cache;
       long startTime = 0;
       static Fqn fqn = Fqn.fromString("/TEST/Instrument");
       public int msgReceived = 0;
       static JLabel info = new JLabel("Read ");
       static JLabel cptInfo= new JLabel("");
       TreeCacheListenerAdapter cacheAdapter;
       public SubscriberTest() throws Exception
       {
       super();
       cache = new TreeCacheAop();
       PropertyConfigurator config = new PropertyConfigurator();
       config.configure(cache, PublisherTest.CACHE_CFG);
       cache.startService();
       cacheAdapter = new TreeCacheListenerAdapter(cache);
       cacheAdapter.addDataListener(this);
       }
      
       public static void main(String[] args)
       {
       try
       {
       new SubscriberTest();
       JXFrame frame = new JXFrame("Test Read", true);
       JPanel pane = new JPanel();
       pane.add(info);
       pane.add(cptInfo);
       frame.setContentPane(pane);
       frame.pack();
       frame.setLocation(0, 100);
       frame.setVisible(true);
       }
       catch (Exception e) {
       e.printStackTrace();
       }
      
       }
      
       public void dataModified(Fqn fqn, Object data) {
       if (startTime == 0)
       startTime = System.currentTimeMillis();
      
       try
       {
       PublisherTest.instrument instr = (PublisherTest.instrument)cache.getObject(fqn);
       System.out.println("receive: "+instr.Id);
       if ((msgReceived % PublisherTest.NB_MSG_MOD) == 0)
       cptInfo.setText(""+msgReceived);
      
       msgReceived++;
       }
       catch (CacheException e)
       {
       e.printStackTrace();
       }
       if (msgReceived == PublisherTest.NB_MSG_TO_SEND)
       {
       long endTime = System.currentTimeMillis();
       double totalTime = (endTime - startTime);
       double avg = PublisherTest.NB_MSG_TO_SEND / totalTime * 1000;
       System.out.println("Total Read "+msgReceived+" msg, Time: "+totalTime+ " ms, avg msg/s read: "+avg);
       cptInfo.setText(""+msgReceived);
       }
       }
      
       public void dataRemoved(Fqn fqn, Object data) {}
       public void dataEvicted(Fqn fqn, Object data) {}
      }
      
      File: CacheDataListener.java
      import org.jboss.cache.Fqn;
      
      public interface CacheDataListener
      {
       public void dataModified(Fqn fqn, Object data);
       public void dataRemoved(Fqn fqn, Object data);
       public void dataEvicted(Fqn fqn, Object data);
      }
      
      File: TreeCacheListenerAdapter.java
      import java.util.concurrent.ConcurrentLinkedQueue;
      
      import org.jboss.cache.CacheException;
      import org.jboss.cache.Fqn;
      import org.jboss.cache.TreeCache;
      import org.jboss.cache.TreeCacheListener;
      import org.jboss.cache.aop.AOPInstance;
      import org.jboss.cache.aop.TreeCacheAop;
      import org.jgroups.View;
      
      public class TreeCacheListenerAdapter
       implements TreeCacheListener
      {
       protected ConcurrentLinkedQueue<CacheDataListener> listenersData = new ConcurrentLinkedQueue<CacheDataListener>();
       TreeCacheAop cache;
      
       /**
       *
       */
       public TreeCacheListenerAdapter(TreeCacheAop cache)
       {
       super();
       this.cache = cache;
       cache.addTreeCacheListener(this);
       }
      
       public void addDataListener(CacheDataListener listener)
       {
       if (!listenersData.contains(listener))
       listenersData.add(listener);
       }
      
       public void removeDataListener(CacheDataListener listener)
       {
       listenersData.remove(listener);
       }
      
       public void nodeCreated(Fqn fqn) { }
       public void nodeRemoved(Fqn fqn) {
       notifyDataRemoved(fqn, null);
       }
      
       public void nodeLoaded(Fqn fqn) { }
      
       public void nodeEvicted(Fqn fqn)
       { notifyDataEvicted(fqn, null); }
      
      
       public void nodeModified(Fqn fqn)
       {
       try
       {
       notifyDataModified(fqn, cache.getObject(fqn));
       }
       catch (CacheException e)
       {
       // TODO Auto-generated catch block
       e.printStackTrace();
       }
       }
      
       public void nodeVisited(Fqn fqn) { }
      
       public void cacheStarted(TreeCache cache) { }
      
       public void cacheStopped(TreeCache cache) { }
      
       public void viewChange(View new_view) { }
      
       public void notifyDataModified(Fqn fqn, Object data) {
       if (data != null)
       if ((data instanceof Class) || (data instanceof AOPInstance) || (data instanceof String && data.equals(TreeCacheAop.DUMMY)))
       //&& ((AOPInstance) data).get() == null))
       return;
       else
       for(CacheDataListener listener : listenersData)
       listener.dataModified(fqn, data);
       }
      
       public void notifyDataRemoved(Fqn fqn, Object data)
       {
       for(CacheDataListener listener : listenersData)
       listener.dataRemoved(fqn, data);
       }
      
       public void notifyDataEvicted(Fqn fqn, Object data)
       {
       for(CacheDataListener listener : listenersData)
       listener.dataEvicted(fqn, data);
       }
      
      }
      


      Listing 2: PublisherTest.java
      import java.io.Serializable;
      
      import javax.swing.JLabel;
      import javax.swing.JPanel;
      
      import org.jboss.cache.CacheException;
      import org.jboss.cache.Fqn;
      import org.jboss.cache.PropertyConfigurator;
      import org.jboss.cache.aop.TreeCacheAop;
      import org.jdesktop.swingx.JXFrame;
      
      public class PublisherTest
      {
       static int cpt = 0;
       public static long NB_MSG_TO_SEND = 100000;
       public static long NB_MSG_MOD = 100;
       /**
       *
       */
       static JLabel info = new JLabel("Send ");
       static JLabel cptInfo= new JLabel("");
       static String CACHE_CFG = fast-service-nothread-test.xml";
      
       public static class instrument implements Serializable {
       long Id;
       String label;
       double price;
       String desc;
       int foo;
       float bar;
       public instrument(long id, float bar, String desc, int foo, String label, double price)
       {
       super();
       // TODO Auto-generated constructor stub
       this.bar = bar;
       this.desc = desc;
       this.foo = foo;
       Id = id;
       this.label = label;
       this.price = price;
       }
      
       }
      
       private static void benchSendMsg(TreeCacheAop cache) throws CacheException
       {
       System.out.println("Start Send Bench");
       Fqn fqn = Fqn.fromString("/TEST/Instrument");
       long startTime = System.currentTimeMillis();
       for (long i = 0 ; i < NB_MSG_TO_SEND ; i++)
       {
       cache.putObject(fqn, new instrument(i, (float)(i*0.5), "aaa", (int)i, "aaaa", (double)i*2));
       if ((i % NB_MSG_MOD) == 0)
       cptInfo.setText(""+i);
       if ((i % 1000) == 0)
       try
       {
       Thread.sleep(2000);
       }
       catch (InterruptedException e)
       {
       // TODO Auto-generated catch block
       e.printStackTrace();
       }
      
       }
       cptInfo.setText(""+NB_MSG_TO_SEND);
       long endTime = System.currentTimeMillis();
       double totalTime = (endTime - startTime);
       double avg = NB_MSG_TO_SEND / totalTime * 1000;
       System.out.println("Total Send "+NB_MSG_TO_SEND+"ms Time: "+totalTime+ " avg msg/s send: "+avg);
       }
      
       /**
       * @param args
       */
       public static void main(String[] args)
       {
       TreeCacheAop dataCache;
       JXFrame frame = new JXFrame("Test Send", true);
       JPanel pane = new JPanel();
       pane.add(info);
       pane.add(cptInfo);
       frame.setContentPane(pane);
       frame.pack();
       frame.setVisible(true);
      
       try
       {
       dataCache = new TreeCacheAop();
       PropertyConfigurator config = new PropertyConfigurator();
       config.configure(dataCache, CACHE_CFG);
       dataCache.startService();
      
       benchSendMsg(dataCache);
       }
       catch (Exception e)
       {
       // TODO Auto-generated catch block
       e.printStackTrace();
       }
       }
      }
      



      Config-file


        • 1. Re: JBossCacheAop Memory leak ?
          Ben Wang Master

          Did you regulate your sender send rate? That is, did you do some sleep? If you don't, you will need a flow control protocol in jgroups stack. Otherwise, sender will just flood the network and gc won't start in time.

          I assume you are using repl_async?

          -Ben

          • 2. Re: JBossCacheAop Memory leak ?
            Christophe Lallement Newbie

            Yes i use async,

            i don't use flow control protocol (i don't now if it's supported in this release of JBoss (1.2.4/jgroups 2.2.8),
            but as you can see in my publisher test class, all 1000 msg (via modulo)
            i call a sleep of 2000ms (i have test without).
            What is very strange is, this is not the publisher but the sender which grow up in memory.
            more strange and more supect for a production release is that the memory never retrieve a "normal" space.
            ( i mean: ok the JVM has allocate a big space of memory to compute a burst of msg, but when all msg have been send by the publisher
            (for example i test with 100 000 msg) the memory stay at the same level (and a force gc has no impact).

            Today, i try to test with TreeCache aonly (without AOP features).

            Christophe

            <?xml version="1.0" encoding="UTF-8"?>
            
            <!-- ===================================================================== -->
            <!-- -->
            <!-- Sample TreeCache Service Configuration -->
            <!-- -->
            <!-- ===================================================================== -->
            
            <server>
             <!--
            
             <classpath codebase="./lib" archives="jboss-cache.jar, jgroups.jar"/>
            
            
             --><!-- ==================================================================== -->
             <!-- Defines TreeCache configuration -->
             <!-- ==================================================================== -->
            
             <mbean code="org.jboss.cache.TreeCache" name="jboss.cache:service=TreeCache">
            
             <depends>jboss:service=Naming</depends>
             <depends>jboss:service=TransactionManager</depends>
            
             <!--
             Configure the TransactionManager
             -->
             <attribute name="TransactionManagerLookupClass">org.jboss.cache.DummyTransactionManagerLookup</attribute>
            
             <!--
             Isolation level : SERIALIZABLE
             REPEATABLE_READ (default)
             READ_COMMITTED
             READ_UNCOMMITTED
             NONE
             -->
             <attribute name="IsolationLevel">REPEATABLE_READ</attribute>
            
             <!--
             Valid modes are LOCAL, REPL_ASYNC and REPL_SYNC
             -->
             <attribute name="CacheMode">REPL_ASYNC</attribute>
            
             <!--
             Just used for async repl: use a replication queue
             -->
             <attribute name="UseReplQueue">false</attribute>
            
             <!--
             Replication interval for replication queue (in ms)
             -->
             <attribute name="ReplQueueInterval">0</attribute>
            
             <!--
             Max number of elements which trigger replication
             -->
             <attribute name="ReplQueueMaxElements">0</attribute>
            
             <!-- Name of cluster. Needs to be the same for all clusters, in order
             to find each other
             -->
             <attribute name="ClusterName">**TEST**</attribute>
            
             <!-- JGroups protocol stack properties. Can also be a URL,
             e.g. file:/home/bela/default.xml
             <attribute name="ClusterProperties"></attribute>
             -->
            
             <attribute name="ClusterConfig">
             <config>
             <UDP mcast_send_buf_size="10000000" mcast_port="6667" ucast_recv_buf_size="10000000"
             mcast_addr="239.255.8.15" bind_to_all_interfaces="true" loopback="false" mcast_recv_buf_size="10000000"
             max_bundle_size="64000" max_bundle_timeout="30" use_incoming_packet_handler="false"
             use_outgoing_packet_handler="true" ucast_send_buf_size="10000000" ip_ttl="32" enable_bundling="true" />
             <PING timeout="2000" down_thread="false" num_initial_members="3" />
             <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" />
             <FD_SOCK down_thread="false" />
             <VERIFY_SUSPECT timeout="1500" down_thread="false" />
             <pbcast.NAKACK max_xmit_size="60000" down_thread="false" use_mcast_xmit="true" gc_lag="50"
             retransmit_timeout="300,600,1200,2400,4800" discard_delivered_msgs="true"/>
             <UNICAST timeout="300,600,1200,2400,3600" down_thread="false" />
             <pbcast.STABLE stability_delay="1000" desired_avg_gossip="5000" down_thread="false" max_bytes="250000" />
             <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000"
             shun="true" />
             <FC max_credits="1000000" down_thread="false" min_threshold="0.10" />
             <FRAG frag_size="60000" down_thread="false" up_thread="true" />
             <COMPRESS down_thread="false" min_size="500" compression_level="3" up_thread="true" />
             <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" />
             </config>
            
             </attribute>
            
             <!--
             Whether or not to fetch state on joining a cluster
             -->
             <attribute name="FetchStateOnStartup">true</attribute>
             <attribute name="CacheLoaderFetchTransientState">true</attribute>
            
             <!--
             The max amount of time (in milliseconds) we wait until the
             initial state (ie. the contents of the cache) are retrieved from
             existing members in a clustered environment
             -->
             <attribute name="InitialStateRetrievalTimeout">5000</attribute>
            
             <!--
             Number of milliseconds to wait until all responses for a
             synchronous call have been received.
             -->
             <attribute name="SyncReplTimeout">20000</attribute>
            
             <!-- Max number of milliseconds to wait for a lock acquisition -->
             <attribute name="LockAcquisitionTimeout">15000</attribute>
            
            
             <!-- Name of the eviction policy class. -->
             <attribute name="EvictionPolicyClass"></attribute>
            
             <!--
             Indicate whether to use marshalling or not. Set this to true if you are running under a scoped
             class loader, e.g., inside an application server. Default is "false".
             -->
             <attribute name="UseMarshalling">false</attribute>
            
             </mbean>
            
            
             <!-- Uncomment to get a graphical view of the TreeCache MBean above --><!--
             <mbean code="org.jboss.cache.TreeCacheView" name="jboss.cache:service=TreeCacheView">
             <depends>jboss.cache:service=TreeCache</depends>
             <attribute name="CacheService">jboss.cache:service=TreeCache</attribute>
             </mbean>
             -->
            </server>



            • 3. Re: JBossCacheAop Memory leak ?
              Ben Wang Master

              OK, if you still find that a problem, can you open up a JBossCache Jira issue and attach your test case?

              If it is aop-specific, please assign it to me and I will take a look.

              -Ben

              • 4. Re: JBossCacheAop Memory leak ?
                John Lin Newbie

                Hi Chris,

                Have you done the testing for TreeCache without AOP?

                -John