2 Replies Latest reply on Dec 4, 2008 6:02 AM by oferunipier

    problems with replication of huge amount of data

    oferunipier

      in my cache population phase, on some cases the loaded object graph size is big (~20MG in a 5300 tree put operations) and when replicated to other nodes, replications fails with ClassCastException. When observing the logs i found the toString of the failed 'replicate' JBCMethodCall, and i found that the ending 2000 _put operations could truely leads to ClassCastException: an invalid _put operation looks like that:

      ,_put(org.jboss.cache.aop.AOPInstance@1c72479
      , /broker_objs/TransientOfferList/id/81021/properties/offerlist/120, __jboss:internal:class__, class com.unipier.broker.rm.data.gen.OfferDO, true)

      where a valid _put operation should have a GlobalTransaction as first parameter

      (valid put operation looks like that: ,_put(GlobalTransaction:<172.25.72.74:7840>:58, /broker_objs/TransientOfferList/id/81021/properties/offerlist/120, AOPInstance, org.jboss.cache.aop.AOPInstance@1c72479, true)

      Where is the problem? could it be related to the configuration fn the protocol stack? buffers size?

      below the stack trace and the protocols stack

      the full stack trace is:

      01 Dec 2008 12:59:05,484 ERROR [UpHandler (STATE_TRANSFER)] MethodCall : exception in invoke()
      java.lang.IllegalArgumentException: java.lang.ClassCastException@6ee5d6e
      at sun.reflect.GeneratedMethodAccessor1092.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      at java.lang.reflect.Method.invoke(Method.java:585)
      at org.jgroups.blocks.MethodCall.invoke(MethodCall.java:330)
      at org.jboss.cache.interceptors.CallInterceptor.invoke(CallInterceptor.java:52)
      at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:68)
      at org.jboss.cache.interceptors.PessimisticLockInterceptor.invoke(PessimisticLockInterceptor.java:193)
      at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:68)
      at org.jboss.cache.interceptors.UnlockInterceptor.invoke(UnlockInterceptor.java:32)
      at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:68)
      at org.jboss.cache.interceptors.ReplicationInterceptor.invoke(ReplicationInterceptor.java:39)
      at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:68)
      at org.jboss.cache.interceptors.TxInterceptor.replayModifications(TxInterceptor.java:554)
      at org.jboss.cache.interceptors.TxInterceptor.handlePessimisticPrepare(TxInterceptor.java:458)
      at org.jboss.cache.interceptors.TxInterceptor.handleRemotePrepare(TxInterceptor.java:324)
      at org.jboss.cache.interceptors.TxInterceptor.invoke(TxInterceptor.java:131)
      at org.jboss.cache.TreeCache.invokeMethod(TreeCache.java:5899)
      at org.jboss.cache.TreeCache._replicate(TreeCache.java:5176)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
      at java.lang.reflect.Method.invoke(Method.java:585)
      at org.jgroups.blocks.MethodCall.invoke(MethodCall.java:330)
      at org.jgroups.blocks.RpcDispatcher.handle(RpcDispatcher.java:281)
      at org.jgroups.blocks.RequestCorrelator.handleRequest(RequestCorrelator.java:650)
      at org.jgroups.blocks.RequestCorrelator.receiveMessage(RequestCorrelator.java:535)
      at org.jgroups.blocks.RequestCorrelator.receive(RequestCorrelator.java:358)
      at org.jgroups.blocks.MessageDispatcher$ProtocolAdapter.up(MessageDispatcher.java:775)
      at org.jgroups.JChannel.up(JChannel.java:1091)
      at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:377)


      the protocols stack is

      <TCP start_port="7840"
      loopback="true"
      recv_buf_size="20000000"
      send_buf_size="640000"
      discard_incompatible_packets="true"
      max_bundle_size="64000"
      max_bundle_timeout="30"
      use_incoming_packet_handler="true"
      use_outgoing_packet_handler="false"
      down_thread="false" up_thread="false"
      enable_bundling="true"
      use_send_queues="false"
      sock_conn_timeout="300"
      skip_suspected_members="true"/>
      <TCPPING timeout="10000"
      down_thread="true" up_thread="true"
      initial_hosts="10.231.9.86[7840],10.231.9.85[7840]"
      port_range="1"
      num_initial_members="3"/>
      <MERGE2 max_interval="100000"
      down_thread="false" up_thread="false" min_interval="20000"/>
      <FD_SOCK down_thread="false" up_thread="false"/>
      <FD timeout="60000" max_tries="4" down_thread="true" up_thread="true" shun="true"/>
      <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
      <pbcast.NAKACK max_xmit_size="60000"
      use_mcast_xmit="false" gc_lag="0"
      retransmit_timeout="600,1200,2400,4800"
      down_thread="true" up_thread="true"
      discard_delivered_msgs="true"/>
      <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
      down_thread="false" up_thread="false"
      max_bytes="400000"/>
      <pbcast.GMS print_local_addr="true" join_timeout="5000"
      down_thread="true" up_thread="true"
      join_retry_timeout="2000" shun="true"
      view_bundling="true"/>
      <FC max_credits="2000000" down_thread="false" up_thread="false"
      min_threshold="0.10"/>
      <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
      <pbcast.STATE_TRANSFER down_thread="true" up_thread="true" use_flush="false"/>

        • 1. Re: problems with replication of huge amount of data
          oferunipier

          bug is on jbosscache 1.4.1 SP9
          i have a sample junit test develope as part of the distribution tests.
          Below the code and the replSyncTCP-service.xml file.
          Put the class in the test dir under org.jboss.cache.aop package
          Put the attached replSyncTCP-service.xml file along with other configuration files on /etc/META-INF
          run the test with jvm parameters of -Xms128m -Xmx256m


          
          
          package org.jboss.cache.aop;
          
          import java.util.ArrayList;
          import java.util.HashMap;
          import java.util.HashSet;
          import java.util.List;
          import java.util.Map;
          import java.util.Set;
          
          import javax.transaction.NotSupportedException;
          import javax.transaction.SystemException;
          import javax.transaction.Transaction;
          import javax.transaction.TransactionManager;
          
          import org.apache.commons.logging.Log;
          import org.apache.commons.logging.LogFactory;
          import org.jboss.aop.Advised;
          import org.jboss.cache.DummyTransactionManagerLookup;
          import org.jboss.cache.Fqn;
          import org.jboss.cache.PropertyConfigurator;
          import org.jboss.cache.aop.test.Address;
          import org.jboss.cache.aop.test.Person;
          import org.jboss.cache.transaction.DummyTransactionManager;
          
          import junit.framework.TestCase;
          
          public class MassReplicationTest extends TestCase {
           private static Log log = LogFactory.getLog(MassReplicationTest.class);
          
           PojoCache cache1;
           PojoCache cache2;
          
           private int objectsCount = 25;
           private int itemsCount = 500;
          
           public MassReplicationTest(String name) {
           super(name);
           }
          
           protected void setUp() throws Exception {
           log.info("setUp");
           cache1 = createCache();
           cache2 = createCache();
          
           if (!(new Address() instanceof Advised)) {
           log.error("Address is not an instance of Advised");
           throw new RuntimeException("Address must be advised!");
           }
          
           if (!(new Person() instanceof Advised)) {
           log.error("Person is not an instance of Advised");
           throw new RuntimeException("Person must be advised!");
           }
           }
          
           protected void tearDown() throws Exception {
           log.info("tearDown");
           stopCache(cache1);
           stopCache(cache2);
           }
          
           public void testMassReplication() throws Exception {
           log.info("testMassReplication total objects "+objectsCount+", each object items count "+itemsCount);
           Transaction tx = startTransaction();
           for (int i=0;i<objectsCount;i++)
           cache1.putObject(Fqn.fromString("/objects/"+i), createPerson("joe "+i,itemsCount));
           tx.commit();
           }
          
           private Object createPerson(String name,int richness) {
           log.info("creating person "+name);
           Address addr1 = new Address();
           addr1.setStreet("101 Oakview Dr");
           addr1.setCity("Anytown");
           addr1.setZip(11111);
          
           Person p = new Person();
           p.setName(name);
           p.setAge(20);
           p.setAddress(addr1);
          
           Set skills = new HashSet();
           Map hobbies = new HashMap();
           List medications = new ArrayList();
           List languages = new ArrayList();
           for (int i=0;i<richness;i++) {
           skills.add("skill."+i);
           hobbies.put("hobbie."+i,"Biking");
           medications.add("medication."+i);
           languages.add("language."+i);
           }
          
           p.setSkills(skills);
           p.setHobbies(hobbies);
           p.setLanguages(languages);
           p.setMedication(medications);
          
           return p;
           }
          
           private PojoCache createCache() throws Exception {
           log.info("creating cache");
          
           PojoCache cache = createCache(true, false);
           cache.setTransactionManagerLookup(new DummyTransactionManagerLookup());
           Transaction tx = startTransaction();
           cache.activateRegion("/objects");
           tx.commit();
           return cache;
           }
          
           protected PojoCache createCache(boolean useMarshalling,boolean inactiveOnStartup) throws Exception {
           PojoCache tree = new PojoCache();
           PropertyConfigurator config = new PropertyConfigurator();
           String configFile = "META-INF/replSyncTCP-service.xml";
           config.configure(tree, configFile); // read in generic replAsync xml
           //tree.setDeadlockDetection(sync);
           tree.setClusterName("MassReplication");
           tree.setReplicationVersion("1.4.0.GA");
           // Use a long timeout to facilitate setting debugger breakpoints
           tree.setInitialStateRetrievalTimeout(60000);
           if (useMarshalling)
           {
           tree.setUseMarshalling(true);
           tree.setInactiveOnStartup(inactiveOnStartup);
           }
          
           tree.createService();
           tree.startService();
          
           return tree;
           }
          
           private Transaction startTransaction() throws SystemException, NotSupportedException {
           DummyTransactionManager mgr=DummyTransactionManager.getInstance();
           mgr.begin();
           return mgr.getTransaction();
           }
          
           protected void stopCache(PojoCache cache) {
           if (cache != null)
           {
           try
           {
           TransactionManager m = cache.getTransactionManager();
           try
           {
           if (m!= null && m.getTransaction() != null)
           {
           m.rollback();
           }
           }
           catch (Exception e)
           {
           }
           cache.stopService();
           cache.destroyService();
           }
           catch (Exception e)
           {
           log.error("Exception stopping cache " + e.getMessage(), e);
           }
           }
           }
          }//class
          
          
          
          


          replSyncTCP-service.xml

          
          <?xml version="1.0" encoding="UTF-8"?>
          
          <!-- ===================================================================== -->
          <!-- -->
          <!-- Sample TreeCache Service Configuration -->
          <!-- -->
          <!-- ===================================================================== -->
          
          <server>
          
           <classpath codebase="./lib" archives="jboss-cache.jar, jgroups.jar"/>
          
          
           <!-- ==================================================================== -->
           <!-- Defines TreeCache configuration -->
           <!-- ==================================================================== -->
          
           <mbean code="org.jboss.cache.TreeCache"
           name="jboss.cache:service=TreeCache">
          
           <depends>jboss:service=Naming</depends>
           <depends>jboss:service=TransactionManager</depends>
          
           <!--
           Configure the TransactionManager
           -->
           <attribute name="TransactionManagerLookupClass">org.jboss.cache.DummyTransactionManagerLookup</attribute>
          
           <!--
           Isolation level : SERIALIZABLE
           REPEATABLE_READ (default)
           READ_COMMITTED
           READ_UNCOMMITTED
           NONE
           -->
           <attribute name="IsolationLevel">REPEATABLE_READ</attribute>
          
           <!--
           Valid modes are LOCAL
           REPL_ASYNC
           REPL_SYNC
           INVALIDATION_ASYNC
           INVALIDATION_SYNC
           -->
           <attribute name="CacheMode">REPL_SYNC</attribute>
          
           <!--
           Just used for async repl: use a replication queue
           -->
           <attribute name="UseReplQueue">false</attribute>
          
           <!--
           Replication interval for replication queue (in ms)
           -->
           <attribute name="ReplQueueInterval">0</attribute>
          
           <!--
           Max number of elements which trigger replication
           -->
           <attribute name="ReplQueueMaxElements">0</attribute>
          
           <!-- Name of cluster. Needs to be the same for all clusters, in order
           to find each other
           -->
           <attribute name="ClusterName">TreeCache-Cluster</attribute>
          
           <!-- JGroups protocol stack properties. Can also be a URL,
           e.g. file:/home/bela/default.xml
           <attribute name="ClusterProperties"></attribute>
           -->
          
           <attribute name="ClusterConfig">
           <config>
           <!-- ofer: i drop the UDP section here, take it from the zip -->
           <!-- ofer: add tcp-->
           <TCP start_port="7840"
           loopback="true"
           recv_buf_size="20000000"
           send_buf_size="640000"
           discard_incompatible_packets="true"
           max_bundle_size="64000"
           max_bundle_timeout="30"
           use_incoming_packet_handler="true"
           use_outgoing_packet_handler="false"
           down_thread="false" up_thread="false"
           enable_bundling="true"
           use_send_queues="false"
           sock_conn_timeout="300"
           skip_suspected_members="true"/>
           <TCPPING timeout="10000"
           down_thread="true" up_thread="true"
           initial_hosts="10.231.8.187[7840],10.231.8.187[7840]"
           port_range="1"
           num_initial_members="3"/>
           <MERGE2 max_interval="100000"
           down_thread="false" up_thread="false" min_interval="20000"/>
           <FD_SOCK down_thread="false" up_thread="false"/>
           <FD timeout="60000" max_tries="4" down_thread="true" up_thread="true" shun="true"/>
           <VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
           <pbcast.NAKACK max_xmit_size="60000"
           use_mcast_xmit="false" gc_lag="0"
           retransmit_timeout="600,1200,2400,4800"
           down_thread="true" up_thread="true"
           discard_delivered_msgs="true"/>
           <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
           down_thread="false" up_thread="false"
           max_bytes="400000"/>
           <pbcast.GMS print_local_addr="true" join_timeout="5000"
           down_thread="true" up_thread="true"
           join_retry_timeout="2000" shun="true"
           view_bundling="true"/>
           <FC max_credits="2000000" down_thread="false" up_thread="false"
           min_threshold="0.10"/>
           <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
           <pbcast.STATE_TRANSFER down_thread="true" up_thread="true" use_flush="false"/>
           </config>
           </attribute>
          
          
           <!--
           Whether or not to fetch state on joining a cluster
           NOTE this used to be called FetchStateOnStartup and has been renamed to be more descriptive.
           -->
           <attribute name="FetchInMemoryState">true</attribute>
          
           <!--
           The max amount of time (in milliseconds) we wait until the
           initial state (ie. the contents of the cache) are retrieved from
           existing members in a clustered environment
           -->
           <attribute name="InitialStateRetrievalTimeout">15000</attribute>
          
           <!--
           Number of milliseconds to wait until all responses for a
           synchronous call have been received.
           -->
           <attribute name="SyncReplTimeout">15000</attribute>
          
           <!-- Max number of milliseconds to wait for a lock acquisition -->
           <attribute name="LockAcquisitionTimeout">10000</attribute>
          
           <!-- Name of the eviction policy class. -->
           <attribute name="EvictionPolicyClass"></attribute>
          
           <!--
           Indicate whether to use region based marshalling or not. Set this to true if you are running under a scoped
           class loader, e.g., inside an application server. Default is "false".
           -->
           <attribute name="UseRegionBasedMarshalling">true</attribute>
           </mbean>
          
          
           <!-- Uncomment to get a graphical view of the TreeCache MBean above -->
           <!-- <mbean code="org.jboss.cache.TreeCacheView" name="jboss.cache:service=TreeCacheView">-->
           <!-- <depends>jboss.cache:service=TreeCache</depends>-->
           <!-- <attribute name="CacheService">jboss.cache:service=TreeCache</attribute>-->
           <!-- </mbean>-->
          
          
          </server>
          


          • 2. Re: problems with replication of huge amount of data
            oferunipier

            this is a jboss cache bug and related to https://jira.jboss.org/jira/browse/JBCACHE-1211: when marshalling objects to bytes, the TreeCacheMarshaller140 manage a references map (over GlobalTransaction,Fqn,Serializeble and String) with a key of type short (hence supports only 2^15 references). On huge replication with more then 2^15 references, the key is not unique and on unmarshelling a wrong reference is fetched from the map and hence the ClassCastException (in the MassReplicationTest,Fqn was fetched instead of GlobalTransaction).
            The fix is to use Integer as a key for references.

            see fixed code below:


            /*
             * JBoss, Home of Professional Open Source
             *
             * Distributable under LGPL license.
             * See terms of license at gnu.org.
             */
            package org.jboss.cache.marshall;
            
            import org.apache.commons.logging.Log;
            import org.apache.commons.logging.LogFactory;
            import org.jboss.cache.CacheException;
            import org.jboss.cache.Fqn;
            import org.jboss.cache.GlobalTransaction;
            import org.jgroups.Address;
            import org.jgroups.blocks.MethodCall;
            import org.jgroups.stack.IpAddress;
            
            import java.io.IOException;
            import java.io.ObjectInputStream;
            import java.io.ObjectOutputStream;
            import java.io.Serializable;
            import java.util.ArrayList;
            import java.util.Collection;
            import java.util.HashMap;
            import java.util.HashSet;
            import java.util.Iterator;
            import java.util.LinkedList;
            import java.util.List;
            import java.util.Map;
            import java.util.Set;
            import java.util.TreeMap;
            import java.util.TreeSet;
            
            /**
             * An enhanced marshaller for RPC calls between TreeCache instances.
             *
             * @author <a href="mailto:manik@jboss.org">Manik Surtani (manik@jboss.org)</a>
             */
            public class TreeCacheMarshaller140 extends Marshaller
            {
             // logger
             private static Log log = LogFactory.getLog(TreeCacheMarshaller140.class);
            
             // magic numbers
             protected static final int MAGICNUMBER_METHODCALL = 1;
             protected static final int MAGICNUMBER_FQN = 2;
             protected static final int MAGICNUMBER_GTX = 3;
             protected static final int MAGICNUMBER_IPADDRESS = 4;
             protected static final int MAGICNUMBER_ARRAY_LIST = 5;
             protected static final int MAGICNUMBER_INTEGER = 6;
             protected static final int MAGICNUMBER_LONG = 7;
             protected static final int MAGICNUMBER_BOOLEAN = 8;
             protected static final int MAGICNUMBER_STRING = 9;
             protected static final int MAGICNUMBER_LINKED_LIST = 10;
             protected static final int MAGICNUMBER_HASH_MAP = 11;
             protected static final int MAGICNUMBER_TREE_MAP = 12;
             protected static final int MAGICNUMBER_HASH_SET = 13;
             protected static final int MAGICNUMBER_TREE_SET = 14;
             protected static final int MAGICNUMBER_NULL = 99;
             protected static final int MAGICNUMBER_SERIALIZABLE = 100;
             protected static final int MAGICNUMBER_REF = 101;
            
             public TreeCacheMarshaller140(RegionManager manager, boolean defaultInactive, boolean useRegionBasedMarshalling)
             {
             init(manager, defaultInactive, useRegionBasedMarshalling);
             if (useRegionBasedMarshalling)
             {
             log.debug("Using region based marshalling logic : marshalling Fqn as a String first for every call.");
             }
             }
            
             // -------- Marshaller interface
            
             public void objectToStream(Object o, ObjectOutputStream out) throws Exception
             {
             if (log.isTraceEnabled()) log.trace("Marshalling object " + o);
             Map refMap = new HashMap();
            
             if (useRegionBasedMarshalling)
             {
             // we first marshall the Fqn as a String (ugh!)
            
             // Just cast o to JBCMethodCall -- in the off chance its not
             // due to a subclass passing in something else, we'll just
             // deal with the CCE
             JBCMethodCall call = null;
             try
             {
             call = (JBCMethodCall) o;
             String fqnAsString = extractFqnAsString(call);
             marshallObject(fqnAsString, out, refMap);
             }
             catch (ClassCastException cce)
             {
             if (call == null)
             {
             log.debug("Received non-JBCMethodCall " + o +" -- cannot extract Fqn so using null");
             marshallObject(null, out, refMap);
             }
             else
             throw cce;
             }
             }
            
             marshallObject(o, out, refMap);
             }
            
             public Object objectFromStream(ObjectInputStream in) throws Exception
             {
             Object retValue;
             Map refMap = new HashMap();
            
             if (useRegionBasedMarshalling)
             {
             // first unmarshall the fqn as a String
             // This may be null if the call being unmarshalled is
             // not region-based
             String fqn = (String) unmarshallObject(in, refMap);
             try
             {
             Region region = null;
             if (fqn != null)
             {
             region = findRegion(fqn);
             }
             retValue = region == null ? unmarshallObject(in, refMap) : unmarshallObject(in, region.getClassLoader(), refMap);
             if (region != null && region.isQueueing())
             {
             Object originalRetValue = retValue;
             if (log.isDebugEnabled())
             {
             log.debug("Received call on an ququing Fqn region (" + fqn + "). Calling enqueueMethodCallMethod");
             }
             retValue = MethodCallFactory.create(MethodDeclarations.enqueueMethodCallMethod, new Object[]{fqn, originalRetValue});
             }
             }
             catch (InactiveRegionException e)
             {
             if (log.isDebugEnabled())
             {
             log.debug("Received call on an inactive Fqn region (" + fqn + "). Calling notifyCallOnInactiveMetod");
             }
             retValue = MethodCallFactory.create(MethodDeclarations.notifyCallOnInactiveMethod, new Object[]{fqn});
             }
             }
             else
             {
             retValue = unmarshallObject(in, refMap);
             }
            
             return retValue;
             }
            
             private Region findRegion(String fqn) throws InactiveRegionException
             {
             Region region;
             // obtain a region from RegionManager, if not, will use default.
             region = fqn == null ? null : getRegion(fqn);
            
             if (region != null)
             {
             // If the region has been marked inactive, we still have
             // to return a MethodCall or RpcDispatcher will log an Error.
             // So, return a call to the TreeCache "_notifyCallOnInactive" method
             if (region.getStatus() == Region.STATUS_INACTIVE)
             {
             throw new InactiveRegionException();
             }
            
             }
             else if (defaultInactive)
             {
             // No region but default inactive means region is inactive
             throw new InactiveRegionException();
             }
            
             return region;
             }
            
             private String extractFqnAsString(JBCMethodCall call) throws Exception
             {
             String fqnAsString;
             switch (call.getMethodId())
             {
             case MethodDeclarations.replicateMethod_id:
             fqnAsString = extractFqnFromMethodCall(call);
             break;
             case MethodDeclarations.replicateAllMethod_id:
             fqnAsString = extractFqnFromListOfMethodCall(call);
             break;
             case MethodDeclarations.dispatchRpcCallMethod_id:
             JBCMethodCall call2 = (JBCMethodCall) call.getArgs()[1];
             fqnAsString = extractFqn(call2);
             break;
             default:
             fqnAsString = extractFqn(call);
             }
            
             return fqnAsString;
             }
            
             // --------- Marshalling methods
            
             private void marshallObject(Object o, ObjectOutputStream out, Map refMap) throws Exception
             {
             if (o == null)
             {
             out.writeByte(MAGICNUMBER_NULL);
             }
             else if (refMap.containsKey(o)) // see if this object has been marshalled before.
             {
             out.writeByte(MAGICNUMBER_REF);
             out.writeInt(((Integer) refMap.get(o)).intValue());
             }
             else if (o instanceof JBCMethodCall)
             {
             // first see if this is a 'known' method call.
             JBCMethodCall call = (JBCMethodCall) o;
            
             if (call.getMethodId() > -1)
             {
             out.writeByte(MAGICNUMBER_METHODCALL);
             marshallMethodCall(call, out, refMap);
             }
             else
             {
             // treat this as a serializable object
            // if (log.isWarnEnabled()) log.warn("Treating method call " + call + " as a normal Serializable object, not attempting to marshall with method ids.");
            //
            // int refId = createReference(o, refMap);
            // out.writeByte(MAGICNUMBER_SERIALIZABLE);
            // out.writeShort(refId);
            // out.writeObject(call);
             throw new IllegalArgumentException("MethodCall "+call+" does not have a valid method id. Was this method call created with MethodCallFactory?");
             }
             }
             else if (o instanceof Fqn)
             {
             int refId = createReference(o, refMap);
             out.writeByte(MAGICNUMBER_FQN);
             out.writeInt(refId);
             marshallFqn((Fqn) o, out, refMap);
             }
             else if (o instanceof GlobalTransaction)
             {
             int refId = createReference(o, refMap);
             out.writeByte(MAGICNUMBER_GTX);
             out.writeInt(refId);
             marshallGlobalTransaction((GlobalTransaction) o, out, refMap);
             }
             else if (o instanceof IpAddress)
             {
             out.writeByte(MAGICNUMBER_IPADDRESS);
             marshallIpAddress((IpAddress) o, out);
             }
             else if (o.getClass().equals(ArrayList.class))
             {
             out.writeByte(MAGICNUMBER_ARRAY_LIST);
             marshallCollection((Collection) o, out, refMap);
             }
             else if (o.getClass().equals(LinkedList.class))
             {
             out.writeByte(MAGICNUMBER_LINKED_LIST);
             marshallCollection((Collection) o, out, refMap);
             }
             else if (o.getClass().equals(HashMap.class))
             {
             out.writeByte(MAGICNUMBER_HASH_MAP);
             marshallMap((Map) o, out, refMap);
             }
             else if (o.getClass().equals(TreeMap.class))
             {
             out.writeByte(MAGICNUMBER_TREE_MAP);
             marshallMap((Map) o, out, refMap);
             }
             else if (o.getClass().equals(HashSet.class))
             {
             out.writeByte(MAGICNUMBER_HASH_SET);
             marshallCollection((Collection) o, out, refMap);
             }
             else if (o.getClass().equals(TreeSet.class))
             {
             out.writeByte(MAGICNUMBER_TREE_SET);
             marshallCollection((Collection) o, out, refMap);
             }
             else if (o instanceof Boolean)
             {
             out.writeByte(MAGICNUMBER_BOOLEAN);
             out.writeBoolean(((Boolean) o).booleanValue());
             }
             else if (o instanceof Integer)
             {
             out.writeByte(MAGICNUMBER_INTEGER);
             out.writeInt(((Integer) o).intValue());
             }
             else if (o instanceof Long)
             {
             out.writeByte(MAGICNUMBER_LONG);
             out.writeLong(((Long) o).longValue());
             }
             else if (o instanceof String)
             {
             int refId = createReference(o, refMap);
             out.writeByte(MAGICNUMBER_STRING);
             out.writeInt(refId);
             out.writeUTF((String) o);
             }
             else if (o instanceof Serializable || ObjectSerializationFactory.useJBossSerialization())
             {
             int refId = createReference(o, refMap);
             if (log.isTraceEnabled()) log.trace("Warning: using object serialization for " + o.getClass());
             out.writeByte(MAGICNUMBER_SERIALIZABLE);
             out.writeInt(refId);
             out.writeObject(o);
             }
             else
             {
             throw new Exception("Don't know how to marshall object of type " + o.getClass());
             }
             }
            
             private int createReference(Object o, Map refMap)
             {
             int reference = refMap.size();
             refMap.put(o, new Integer(reference));
             return reference;
             }
            
             private void marshallMethodCall(JBCMethodCall methodCall, ObjectOutputStream out, Map refMap) throws Exception
             {
             out.writeShort(methodCall.getMethodId());
             Object[] args = methodCall.getArgs();
             byte numArgs = (byte) (args == null ? 0 : args.length);
             out.writeByte(numArgs);
            
             for (int i = 0; i < numArgs; i++)
             {
             marshallObject(args, out, refMap);
             }
             }
            
             private void marshallGlobalTransaction(GlobalTransaction globalTransaction, ObjectOutputStream out, Map refMap) throws Exception
             {
             out.writeLong(globalTransaction.getId());
             marshallObject(globalTransaction.getAddress(), out, refMap);
             }
            
            
             private void marshallFqn(Fqn fqn, ObjectOutputStream out, Map refMap) throws Exception
             {
             boolean isRoot = fqn.isRoot();
             out.writeBoolean(isRoot);
             if (!isRoot)
             {
             out.writeShort(fqn.size());
             for (int i = 0; i < fqn.size(); i++)
             {
             marshallObject(fqn.get(i), out, refMap);
             }
             }
             }
            
             private void marshallIpAddress(IpAddress ipAddress, ObjectOutputStream out) throws Exception
             {
             ipAddress.writeExternal(out);
             }
            
             private void marshallCollection(Collection c, ObjectOutputStream out, Map refMap) throws Exception
             {
             out.writeInt(c.size());
             Iterator i = c.iterator();
             while (i.hasNext())
             {
             marshallObject(i.next(), out, refMap);
             }
             }
            
             private void marshallMap(Map m, ObjectOutputStream out, Map refMap) throws Exception
             {
             out.writeInt(m.size());
             Iterator i = m.keySet().iterator();
             while (i.hasNext())
             {
             Object key = i.next();
             marshallObject(key, out, refMap);
             marshallObject(m.get(key), out, refMap);
             }
             }
            
             // --------- Unmarshalling methods
            
             private Object unmarshallObject(ObjectInputStream in, ClassLoader loader, Map refMap) throws Exception
             {
             if (loader == null)
             {
             return unmarshallObject(in, refMap);
             }
             else
             {
             Thread currentThread = Thread.currentThread();
             ClassLoader old = currentThread.getContextClassLoader();
             try
             {
             currentThread.setContextClassLoader(loader);
             return unmarshallObject(in, refMap);
             }
             finally
             {
             currentThread.setContextClassLoader(old);
             }
             }
             }
            
             private Object unmarshallObject(ObjectInputStream in, Map refMap) throws Exception
             {
             byte magicNumber = in.readByte();
             Integer reference;
             Object retVal;
             switch (magicNumber)
             {
             case MAGICNUMBER_NULL:
             return null;
             case MAGICNUMBER_REF:
             reference = new Integer(in.readInt());
             if (!refMap.containsKey(reference))
             {
             throw new IOException("Unable to locate object reference " + reference + " in byte stream!");
             }
             return refMap.get(reference);
             case MAGICNUMBER_SERIALIZABLE:
             reference = new Integer(in.readInt());
             retVal = in.readObject();
             refMap.put(reference, retVal);
             return retVal;
             case MAGICNUMBER_METHODCALL:
             retVal = unmarshallMethodCall(in, refMap);
             return retVal;
             case MAGICNUMBER_FQN:
             reference = new Integer(in.readInt());
             retVal = unmarshallFqn(in, refMap);
             refMap.put(reference, retVal);
             return retVal;
             case MAGICNUMBER_GTX:
             reference = new Integer(in.readInt());
             retVal = unmarshallGlobalTransaction(in, refMap);
             refMap.put(reference, retVal);
             return retVal;
             case MAGICNUMBER_IPADDRESS:
             retVal = unmarshallIpAddress(in);
             return retVal;
             case MAGICNUMBER_ARRAY_LIST:
             return unmarshallArrayList(in, refMap);
             case MAGICNUMBER_LINKED_LIST:
             return unmarshallLinkedList(in, refMap);
             case MAGICNUMBER_HASH_MAP:
             return unmarshallHashMap(in, refMap);
             case MAGICNUMBER_TREE_MAP:
             return unmarshallTreeMap(in, refMap);
             case MAGICNUMBER_HASH_SET:
             return unmarshallHashSet(in, refMap);
             case MAGICNUMBER_TREE_SET:
             return unmarshallTreeSet(in, refMap);
             case MAGICNUMBER_BOOLEAN:
             return in.readBoolean() ? Boolean.TRUE : Boolean.FALSE;
             case MAGICNUMBER_INTEGER:
             return new Integer(in.readInt());
             case MAGICNUMBER_LONG:
             retVal = new Long(in.readLong());
             return retVal;
             case MAGICNUMBER_STRING:
             reference = new Integer(in.readInt());
             retVal = in.readUTF();
             refMap.put(reference, retVal);
             return retVal;
             default:
             if (log.isErrorEnabled()) log.error("Unknown Magic Number " + magicNumber);
             throw new Exception("Unknown magic number " + magicNumber);
             }
             }
            
             private MethodCall unmarshallMethodCall(ObjectInputStream in, Map refMap) throws Exception
             {
             short methodId = in.readShort();
             byte numArgs = in.readByte();
             Object[] args = null;
            
             if (numArgs > 0)
             {
             args = new Object[numArgs];
            
             for (int i = 0; i < numArgs; i++)
             {
             args = unmarshallObject(in, refMap);
             }
             }
             return MethodCallFactory.create(MethodDeclarations.lookupMethod(methodId), args);
             }
            
             private GlobalTransaction unmarshallGlobalTransaction(ObjectInputStream in, Map refMap) throws Exception
             {
             GlobalTransaction gtx = new GlobalTransaction();
             long id = in.readLong();
             Object address = unmarshallObject(in, refMap);
             gtx.setId(id);
             gtx.setAddress((Address) address);
             return gtx;
             }
            
             private Fqn unmarshallFqn(ObjectInputStream in, Map refMap) throws Exception
             {
            
             boolean isRoot = in.readBoolean();
             Fqn fqn;
             if (!isRoot)
             {
             int numElements = in.readShort();
             List elements = new ArrayList(numElements);
             for (int i = 0; i < numElements; i++)
             {
             elements.add(unmarshallObject(in, refMap));
             }
             fqn = new Fqn(elements);
             }
             else
             {
             fqn = Fqn.ROOT;
             }
             return fqn;
             }
            
             private IpAddress unmarshallIpAddress(ObjectInputStream in) throws Exception
             {
             IpAddress ipAddress = new IpAddress();
             ipAddress.readExternal(in);
             return ipAddress;
             }
            
             private List unmarshallArrayList(ObjectInputStream in, Map refMap) throws Exception
             {
             int listSize = in.readInt();
             List list = new ArrayList(listSize);
             for (int i = 0; i < listSize; i++)
             {
             list.add(unmarshallObject(in, refMap));
             }
             return list;
             }
            
             private List unmarshallLinkedList(ObjectInputStream in, Map refMap) throws Exception
             {
             int listSize = in.readInt();
             List list = new LinkedList();
             for (int i = 0; i < listSize; i++)
             {
             list.add(unmarshallObject(in, refMap));
             }
             return list;
             }
            
             private Map unmarshallHashMap(ObjectInputStream in, Map refMap) throws Exception
             {
             int listSize = in.readInt();
             Map map = new HashMap();
             for (int i = 0; i < listSize; i++)
             {
             map.put(unmarshallObject(in, refMap), unmarshallObject(in, refMap));
             }
             return map;
             }
            
             private Map unmarshallTreeMap(ObjectInputStream in, Map refMap) throws Exception
             {
             int listSize = in.readInt();
             Map map = new TreeMap();
             for (int i = 0; i < listSize; i++)
             {
             map.put(unmarshallObject(in, refMap), unmarshallObject(in, refMap));
             }
             return map;
             }
            
             private Set unmarshallHashSet(ObjectInputStream in, Map refMap) throws Exception
             {
             int listSize = in.readInt();
             Set map = new HashSet();
             for (int i = 0; i < listSize; i++)
             {
             map.add(unmarshallObject(in, refMap));
             }
             return map;
             }
            
             private Set unmarshallTreeSet(ObjectInputStream in, Map refMap) throws Exception
             {
             int listSize = in.readInt();
             Set map = new TreeSet();
             for (int i = 0; i < listSize; i++)
             {
             map.add(unmarshallObject(in, refMap));
             }
             return map;
             }
            
            
             class InactiveRegionException extends CacheException
             {
             public InactiveRegionException()
             {
             super();
             }
            
             public InactiveRegionException(String msg)
             {
             super(msg);
             }
            
             public InactiveRegionException(String msg, Throwable cause)
             {
             super(msg, cause);
             }
             }
            }