3 Replies Latest reply on May 10, 2005 11:59 PM by ben.wang

    Shallow synchronous replication with TreeCacheAop

    monocongo

      In my application I have an MBean which uses the TreeCacheAop service (MBean) for caching. I am getting synchronous replication only when a new object is added to the cache. So when I add a new object to the cache it gets replicated on all servers of the cluster, but when I update member fields of the cached objects the changes only take effect in the cached objects on the server/VM where the update was made. So the replication doesn't go as deep as I was expecting -- is it even possible to have synchronous replication of each change made to cached objects?

      I don't really understand AOP or the process of "aspectizing" objects for the TreeCacheAop, but I suspect that this is where my trouble lies. In the jboss-aop.xml I have the following entry for the class of objects that I'm storing in the TreeCacheAop:

      <prepare expr="field(* $instanceof{com.mycom.grover.bean.UserActivity}->*)" />

      Does this look kosher? Is there something else I need to do in order to get a deeper level of replication?

      Thanks in advance for any suggestions or insight.


      --James

        • 1. Re: Shallow synchronous replication with TreeCacheAop

          Looks good to me. Have you tried to go through the tutorial first. It has a gui that shows you what's been replicated. So you can see it right away.

          It should replicate. That's the whole promise of JBossCacheAop. :-)

          -Ben

          • 2. Re: Shallow synchronous replication with TreeCacheAop
            monocongo

            Yes I agree it should be replicating, the problem is that it isn't. I must be doing something wrong, but for the life of me I can't see what it is. I've gone through all of the tutorials and reference pages, and my configuration seems correct. The synchronous replication of the cached objects in the TreeCacheAop just doesn't happen as advertised. When a new object is added to the TreeCacheAop it's correctly replicated into the other TreeCacheAops on all nodes of the cluster, but any subsequent modifications of the cached objects are not replicated, and everything quickly becomes out of sync.

            Below is a summary of my configuration and code, hopefully someone can see where I'm going awry:

            The class which uses the TreeCacheAop is UserActivityManager, which is a Simple MBean. This class has methods which are invoked by Servlets and other MBeans to update UserActivity JavaBean objects which are being cached. As mentioned in the original post these objects are being "aspectized" by virtue of the entry in jboss-aop.xml. The jboss-service.xml included in my SAR is as follows:

            <?xml version="1.0" encoding="UTF-8"?>
            
            <server>
            
             <mbean code="org.jboss.cache.aop.TreeCacheAop"
             name="jboss.cache:service=TreeCacheAop">
             <depends>jboss:service=Naming</depends>
             <depends>jboss:service=TransactionManager</depends>
             <attribute name="TransactionManagerLookupClass">org.jboss.cache.JBossTransactionManagerLookup</attribute>
             <attribute name="IsolationLevel">REPEATABLE_READ</attribute>
             <attribute name="CacheMode">REPL_SYNC</attribute>
             <attribute name="UseReplQueue">false</attribute>
             <attribute name="ReplQueueInterval">0</attribute>
             <attribute name="ReplQueueMaxElements">0</attribute>
             <attribute name="ClusterName">TreeCache-Cluster</attribute>
             <attribute name="ClusterConfig">
             <config>
             <UDP mcast_addr="228.1.2.83"
             mcast_port="45556"
             ip_ttl="64"
             ip_mcast="true"
             mcast_send_buf_size="150000"
             mcast_recv_buf_size="80000"
             ucast_send_buf_size="150000"
             ucast_recv_buf_size="80000"
             loopback="false"/>
             <PING timeout="2000"
             num_initial_members="3"
             up_thread="false"
             down_thread="false"/>
             <MERGE2 min_interval="10000"
             max_interval="20000"/>
             <FD_SOCK/>
             <VERIFY_SUSPECT timeout="1500"
             up_thread="false"
             down_thread="false"/>
             <pbcast.NAKACK gc_lag="50"
             retransmit_timeout="600,1200,2400,4800"
             up_thread="false"
             down_thread="false"/>
             <pbcast.STABLE desired_avg_gossip="20000"
             up_thread="false"
             down_thread="false"/>
             <UNICAST timeout="600,1200,2400"
             window_size="100"
             min_threshold="10"
             down_thread="false"/>
             <FRAG frag_size="8192"
             down_thread="false"
             up_thread="false"/>
             <pbcast.GMS join_timeout="5000"
             join_retry_timeout="2000"
             shun="true"
             print_local_addr="true"/>
             <pbcast.STATE_TRANSFER up_thread="true"
             down_thread="true"/>
             </config>
             </attribute>
             <attribute name="FetchStateOnStartup">true</attribute>
             <attribute name="InitialStateRetrievalTimeout">15000</attribute>
             <attribute name="SyncReplTimeout">10000</attribute>
             <attribute name="LockAcquisitionTimeout">30000</attribute>
             <attribute name="EvictionPolicyClass"></attribute>
             </mbean>
            
             <mbean code="com.mycom.grover.mbean.UserActivityManager"
             name="grover.management:service=UserActivityManager">
             <depends>jboss.cache:service=TreeCacheAop</depends>
             </mbean>
            
            </server>


            A typical method of the UserActivityManager MBean which accesses the cached UserActivity objects and updates a couple of Date properties:
            /**
             * Updates the user activity information for the specified user to indicate the last time
             * the user accessed the system (heartbeat) and the last time the user sent an IOI message.
             *
             * @param userId the user's ID
             * @throws MBeanException
             */
             public void updateMessageSentTime (String userId)
             throws MBeanException
             {
             // handle a null/blank parameter
             if ((userId == null) || (userId.trim().equals("")))
             {
             // log the error and throw a new Exception
             this.logger.error("Required user ID parameter was either null or blank");
             throw new MBeanException(new IllegalArgumentException("Required user ID parameter was either null or blank"));
             }
            
            
             // create and begin a transaction
             UserTransaction userTransaction;
             try
             {
             userTransaction = (UserTransaction) this.jndiContext.lookup("UserTransaction");
             userTransaction.begin();
             }
             catch (Exception e)
             {
             // log the error and throw an Exception
             this.logger.error("Unable to create and begin a UserTransaction", e);
             throw new MBeanException(e, "Unable to create and begin a UserTransaction");
             }
            
            
             try
             {
             // get the UserActivity for the user, creating one if one doesn't already exist
             UserActivity userActivity = getUserActivity(userId, new Boolean(true));
            
             // set the heartbeat and message sent times
             userActivity.setHeartbeatTime(new Date());
             userActivity.setMessageSentTime(new Date());
            
             // commit the transaction
             userTransaction.commit();
             }
             catch (Exception e)
             {
             try
             {
             // rollback the exception
             userTransaction.setRollbackOnly();
             }
             catch (SystemException e2)
             {
             // log the error
             this.logger.error("Unable to set the UserTransaction for rollback", e2);
             }
            
             // log the error and throw an Exception
             this.logger.error("Unable to update the user activity information for the user with ID " + userId, e);
             throw new MBeanException(e, "Unable to update the user activity information for the user with ID " + userId);
             }
             }
            
            
             /**
             * Gets the cached UserActivity object for the specified user, or null if a
             * corresponding UserActivity object for the specified user is not found and
             * not requested to be created (as specified by the create parameter).
             *
             * NO TRANSACTION
             *
             * @param userId the user's ID
             * @param create whether or not we should create a new UserActivity object for the
             * user if one doesn't already exist
             * @return a dynamic proxy for the user's cached UserActivity object, or null if a
             * UserActivity object for the specified user is not found and the create
             * argument is either null or false
             * @throws MBeanException
             */
             private UserActivity getUserActivity (String userId,
             Boolean create)
             throws MBeanException
             {
             // handle a null/blank parameter
             if ((userId == null) || (userId.trim().equals("")))
             {
             // log the error and throw a new Exception
             this.logger.error("Required user ID parameter was either null or blank");
             throw new MBeanException(new IllegalArgumentException("Required user ID parameter was either null or blank"));
             }
            
            
             // get the corresponding fully qualified name of the UserActivity object for the user
             // (this will be null if the user's activity information doesn't yet exist)
             String fullyQualifiedName = getFullyQualifiedName(userId);
            
            
             // get the proxy for the cached UserActivity for the user if it already exists
             if (fullyQualifiedName != null)
             {
             try
             {
             // get the dynamic proxy for the UserActivity object which is cached for the user
             UserActivity userActivity =
             (UserActivity) this.mbeanServer.invoke(this.cacheServiceName,
             "getObject",
             new Object[] {fullyQualifiedName},
             new String[] {String.class.getName()});
            
             // return the proxy for the user's cached UserActivity object
             return userActivity;
             }
             catch (Exception e)
             {
             // log the error and throw an Exception
             this.logger.error("Unable to get the cached UserActivity object for the user with ID " + userId, e);
             throw new MBeanException(e, "Unable to get the cached UserActivity object for the user with ID " + userId);
             }
             }
             else if ((create != null) && create.booleanValue())
             {
             // get the user's Preferences
             Preferences preferences = null;
             try
             {
             // get the PreferencesAccess EJB
             PreferencesAccessHome preferencesAccessHome =
             (PreferencesAccessHome) ServiceLocator.getInstance().getRemoteHome("ejb/PreferencesAccess", PreferencesAccessHome.class);
             PreferencesAccess preferencesAccess = preferencesAccessHome.create();
            
             // get the user's Preferences Data Transfer Object
             preferences = preferencesAccess.getPreferences(userId);
             }
             catch (Exception e)
             {
             // log the error and throw a new Exception
             this.logger.error("Unable to get Preferences for the user with ID " + userId, e);
             throw new MBeanException(e, "Unable to get Preferences for the user with ID " + userId);
             }
            
            
             // get the cache node name to use based on the user type
             String nodeName;
             if (preferences.isHimUser())
             {
             nodeName = NODE_HIM_USERS;
             }
             else if (preferences.isWebUser())
             {
             nodeName = NODE_WEB_USERS;
             }
             else
             {
             // log the error and throw a new Exception
             this.logger.error("Unable to determine the user type (HIM or WEB) for the user with ID " + userId);
             throw new MBeanException(new Exception("Unable to determine the user type (HIM or WEB) for the user with ID " + userId));
             }
            
             // create the fully qualified name using the node name and user ID
             fullyQualifiedName = nodeName + "/" + userId;
            
             try
             {
             // create a UserActivity object and set the login time
             UserActivity userActivity = new UserActivity(preferences);
             userActivity.setLoginTime(new Date());
            
             // add the UserActivity to the TreeCache
             this.mbeanServer.invoke(this.cacheServiceName,
             "putObject",
             new Object[] {fullyQualifiedName, userActivity},
             new String[] {String.class.getName(), Object.class.getName()});
            
             // get the dynamic proxy for the UserActivity object which is cached for the user
             userActivity = (UserActivity) this.mbeanServer.invoke(this.cacheServiceName,
             "getObject",
             new Object[] {fullyQualifiedName},
             new String[] {String.class.getName()});
            
             // return the dynamic proxy for the user's cached UserActivity object
             return userActivity;
             }
             catch (Exception e)
             {
             // log the error and throw an Exception
             this.logger.error("Unable to cache a UserActivity object for the user with ID " + userId +
             " using the fully qualified name " + fullyQualifiedName);
             throw new MBeanException(new Exception("Unable to cache a UserActivity object for the user with ID " + userId +
             " using the fully qualified name " + fullyQualifiedName));
             }
             }
             else
             {
             // a corresponding UserActivity object wasn't found in the cache,
             // and we don't want to create and cache a new one
             return null;
             }
             }
            
            
             /**
             * Get the fully qualified name for the UserActivity object for a user with
             * the specified user ID. If the user has no cached UserActivity record
             * (doesn't exist) then a null string is returned.
             *
             * NO TRANSACTION
             *
             *@param userId the user's ID
             *@return the fully qualified name of the user's UserActivity record,
             * or null if one doesn't yet exist
             *@throws MBeanException
             */
             private String getFullyQualifiedName (String userId)
             throws MBeanException
             {
             // build fully qualified names using the node names and the user ID
             String himName = NODE_HIM_USERS + "/" + userId;
             String webName = NODE_WEB_USERS + "/" + userId;
            
             try
             {
             // see if a UserActivity object exists for the user under the HIM node
             Boolean userExists =
             (Boolean) this.mbeanServer.invoke(this.cacheServiceName,
             "exists",
             new Object[] {himName},
             new String[] {String.class.getName()});
            
             if (userExists.booleanValue())
             {
             // the fully qualified name using the HIM node is correct
             return himName;
             }
             else
             {
             // see if a UserActivity object exists for the user under the WEB node
             userExists = (Boolean) this.mbeanServer.invoke(this.cacheServiceName,
             "exists",
             new Object[] {webName},
             new String[] {String.class.getName()});
             if (userExists.booleanValue())
             {
             // the fully qualified name using the WEB node is correct
             return webName;
             }
             else
             {
             // a UserActivity object doesn't yet exist for the user
             // and hence no corresponding fully qualified name is available
             return null;
             }
             }
             }
             catch (Exception e)
             {
             // log the error and throw a new Exception
             this.logger.warn("Unable to find the fully qualified name for user with ID " + userId, e);
             throw new MBeanException(e, "Unable to find the fully qualified name for user with ID " + userId);
             }
             }


            As you can see in the updateMessageSentTime() method the entire process of updating the cached UserActivity object's properties is done within a UserTransaction. All other methods which access the cached UserActivity objects follow the same pattern.

            Any suggestions would be greatly appreciated!


            --James

            • 3. Re: Shallow synchronous replication with TreeCacheAop

              James,

              If you can come up with a simple JUnit test case demontrating what you do, I will be glad to take a look.

              Meanwhile, I have couple of exmples junit tests on collection api using TreeCacheAop in the distro. Can you check them out as well.

              -Ben