0 Replies Latest reply on Oct 13, 2007 6:26 PM by Jaco Beukes

    Can't get fine-grained replication working

    Jaco Beukes Newbie

      Thank you for all the help - you guys are doing a great job.

      I have spent that past several weeks preparing was was supposed to have been a very simple JBossCache demo for work. I suspect that I am very close to having what is required, but there is still one outstanding issue - I cannot get fine-grained replication to work.

      I am using JBoss 4.2.1 GA and JBossCache 2.0.0 GA. I have a single instance of JBoss running on each of two seperate nodes in the same cluster with an EJB running in each of the two instances. I also have two seperate standalone apps (clients used to monitor the EJBs for demonstration purposes) running on each of the two nodes.

      I need any changes that are made to an object in either of the two nodes to automatically propagate to the other (ie. fine-grained replication).

      Upon initialization, the following happens on each of the nodes:

      The PojoCache wrapper is registered as an MBean using the following code:


      MBeanServer server = MBeanServerLocator.locateJBoss();
      ObjectName on = new ObjectName("jboss.cache:service=PojoCache")

      PojoCache cache = PojoCacheFactory.createCache("META-INF/service.xml", true);
      PojoCacheJmxWrapperMBean wrapper = new PojoCacheJmxWrapper(cache);

      server.registerMBean(wrapper, on);


      an object is attached to one of the instances using the following code:


      PojoCache cache = cacheWrapper.getPojoCache();
      cache.attach("/person", person);


      the object is retrieved on the other node and it's values are displayed using the following code:


      PojoCache cache = cacheWrapper.getPojoCache();
      person = (Person)cache.find("/person");
      System.out.println(person.toString());


      This all works fine. If I make a change to the object on either of the two nodes and REATTACH it to the cache, the changes
      are successfully propagated to the other node.

      The problem is that I have to reattach the object to the cache in order for it to be replicated and therefore lose the benefits of fine-grained replication. Changes to the object are not automatically intercepted and propagated to the other node.

      I suspect that the problem is related to instrumentation of objects. I am using pojocache-aop.xml and have annotated my Pojo with the "@org.jboss.cache.pojo.annotation.Replicable" annotation. A comment in the pojocache-aop.xml file states that "If a POJO has PojoCachable annotation, it will be asepectized". I have searched the entire JBossCache distribution jar file, but simply cannot find a class named PojoCachable. I have not made any changes to the pojocache-aop.xml file as I understand that any Pojo annotated with "@org.jboss.cache.pojo.annotation.Replicable", will automatically become replicable.

      I have attached the code for the EJB as well as the contents of service.xml below.

      Thank you for your time and help.

      The EJB code:



      package za.co.test.cachedemo.model.session;

      import javax.ejb.CreateException;
      import javax.ejb.SessionBean;
      import javax.ejb.SessionContext;
      import javax.management.InstanceNotFoundException;
      import javax.management.MBeanRegistrationException;
      import javax.management.MBeanServer;
      import javax.management.MBeanServerInvocationHandler;
      import javax.management.MalformedObjectNameException;
      import javax.management.ObjectName;

      import org.jboss.cache.pojo.PojoCache;
      import org.jboss.cache.pojo.PojoCacheFactory;
      import org.jboss.cache.pojo.jmx.PojoCacheJmxWrapper;
      import org.jboss.cache.pojo.jmx.PojoCacheJmxWrapperMBean;
      import org.jboss.mx.util.MBeanServerLocator;

      import za.co.test.cachedemo.model.dto.Person;

      public class JBossCacheDemoBean implements SessionBean {

      SessionContext sessionContext;

      public void initCache() throws java.rmi.RemoteException {

      try {
      MBeanServer server = MBeanServerLocator.locateJBoss();
      ObjectName on = new ObjectName("jboss.cache:service=PojoCache");

      PojoCache cache = PojoCacheFactory.createCache("META-INF/service.xml", true);
      PojoCacheJmxWrapperMBean wrapper = new PojoCacheJmxWrapper(cache);

      server.registerMBean(wrapper, on);
      }
      catch(Exception e) {
      System.out.println(e.getMessage());
      }
      }

      public void attachObjectToCache(Person person) throws java.rmi.RemoteException {
      try {
      MBeanServer server = MBeanServerLocator.locateJBoss();
      ObjectName on = new ObjectName("jboss.cache:service=PojoCache");

      PojoCacheJmxWrapperMBean cacheWrapper =
      (PojoCacheJmxWrapperMBean) MBeanServerInvocationHandler.newProxyInstance(server, on,
      PojoCacheJmxWrapperMBean.class, false);

      PojoCache cache = cacheWrapper.getPojoCache();
      cache.attach("/person", person);
      }
      catch(Exception e) {
      System.out.println(e.getMessage());
      }
      }

      public Person displayCacheContent() throws java.rmi.RemoteException {
      Person person = null;
      try {
      MBeanServer server = MBeanServerLocator.locateJBoss();
      ObjectName on = new ObjectName("jboss.cache:service=PojoCache");

      PojoCacheJmxWrapperMBean cacheWrapper =
      (PojoCacheJmxWrapperMBean) MBeanServerInvocationHandler.newProxyInstance(server, on,
      PojoCacheJmxWrapperMBean.class, false);

      PojoCache cache = cacheWrapper.getPojoCache();
      person = (Person)cache.find("/person");
      }
      catch(Exception e) {
      System.out.println(e.getMessage());
      }
      return person;

      }

      public void modifyCacheContent(Person person_) throws java.rmi.RemoteException {

      try {
      ObjectName on = new ObjectName("jboss.cache:service=PojoCache");

      MBeanServer server = MBeanServerLocator.locateJBoss();
      PojoCacheJmxWrapperMBean cacheWrapper =
      (PojoCacheJmxWrapperMBean) MBeanServerInvocationHandler.newProxyInstance(server, on,
      PojoCacheJmxWrapperMBean.class, false);

      PojoCache cache = cacheWrapper.getPojoCache();

      Person person = (Person)cache.find("/person");
      person.setAge(person_.getAge());
      person.setName(person_.getName());
      person.setSurname(person_.getSurname());
      }
      catch(MalformedObjectNameException e) {
      System.out.println("ERROR:" + e.getMessage());
      e.printStackTrace();
      }
      }

      public void destroyCache() throws java.rmi.RemoteException {
      try {
      ObjectName on = new ObjectName("jboss.cache:service=PojoCache");

      MBeanServer server = MBeanServerLocator.locateJBoss();
      PojoCacheJmxWrapperMBean cacheWrapper =
      (PojoCacheJmxWrapperMBean) MBeanServerInvocationHandler.newProxyInstance(server, on,
      PojoCacheJmxWrapperMBean.class, false);

      PojoCache cache = cacheWrapper.getPojoCache();
      cache.stop();
      cache.destroy();
      server.unregisterMBean(on);

      }
      catch(MalformedObjectNameException e) {
      System.out.println("ERROR:" + e.getMessage());
      e.printStackTrace();
      }
      catch(InstanceNotFoundException e) {
      System.out.println("ERROR:" + e.getMessage());
      e.printStackTrace();
      }
      catch(MBeanRegistrationException e) {
      System.out.println("ERROR:" + e.getMessage());
      e.printStackTrace();
      }
      }

      public void ejbCreate() throws CreateException {

      }

      public void setSessionContext(SessionContext sessionContext) {
      this.sessionContext = sessionContext;
      }

      public void ejbRemove() {
      }

      public void ejbActivate() {
      }

      public void ejbPassivate() {
      }

      }



      Contents of service.xml:





      jboss:service=Naming
      jboss:service=TransactionManager

      <!--
      Configure the TransactionManager
      -->
      org.jboss.cache.transaction.GenericTransactionManagerLookup


      <!--
      Isolation level : SERIALIZABLE
      REPEATABLE_READ (default)
      READ_COMMITTED
      READ_UNCOMMITTED
      NONE
      -->
      REPEATABLE_READ

      <!--
      Valid modes are LOCAL
      REPL_ASYNC
      REPL_SYNC
      INVALIDATION_ASYNC
      INVALIDATION_SYNC
      -->
      REPL_ASYNC

      <!--
      Just used for async repl: use a replication queue
      -->
      false

      <!--
      Replication interval for replication queue (in ms)
      -->
      0

      <!--
      Max number of elements which trigger replication
      -->
      0

      <!-- Name of cluster. Needs to be the same for all TreeCache nodes in a
      cluster in order to find each other.
      -->
      JBossCache-Cluster

      <!--Uncomment next three statements to enable JGroups multiplexer.
      This configuration is dependent on the JGroups multiplexer being
      registered in an MBean server such as JBossAS. -->
      <!--
      jgroups.mux:name=Multiplexer
      jgroups.mux:name=Multiplexer
      fc-fast-minimalthreads
      -->

      <!-- JGroups protocol stack properties.
      ClusterConfig isn't used if the multiplexer is enabled and successfully initialized.
      -->


      <UDP mcast_addr="228.10.10.10"
      mcast_port="45588"
      tos="8"
      ucast_recv_buf_size="20000000"
      ucast_send_buf_size="640000"
      mcast_recv_buf_size="25000000"
      mcast_send_buf_size="640000"
      loopback="false"
      discard_incompatible_packets="true"
      max_bundle_size="64000"
      max_bundle_timeout="30"
      use_incoming_packet_handler="true"
      ip_ttl="2"
      enable_bundling="false"
      enable_diagnostics="true"

      use_concurrent_stack="true"

      thread_naming_pattern="pl"

      thread_pool.enabled="true"
      thread_pool.min_threads="1"
      thread_pool.max_threads="25"
      thread_pool.keep_alive_time="30000"
      thread_pool.queue_enabled="true"
      thread_pool.queue_max_size="10"
      thread_pool.rejection_policy="Run"

      oob_thread_pool.enabled="true"
      oob_thread_pool.min_threads="1"
      oob_thread_pool.max_threads="4"
      oob_thread_pool.keep_alive_time="10000"
      oob_thread_pool.queue_enabled="true"
      oob_thread_pool.queue_max_size="10"
      oob_thread_pool.rejection_policy="Run"/>

      <PING timeout="2000" num_initial_members="3"/>
      <MERGE2 max_interval="30000" min_interval="10000"/>
      <FD_SOCK/>
      <FD timeout="10000" max_tries="5" shun="true"/>
      <VERIFY_SUSPECT timeout="1500"/>
      <pbcast.NAKACK max_xmit_size="60000"
      use_mcast_xmit="false" gc_lag="0"
      retransmit_timeout="300,600,1200,2400,4800"
      discard_delivered_msgs="true"/>

      <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
      max_bytes="400000"/>
      <pbcast.GMS print_local_addr="true" join_timeout="5000"
      join_retry_timeout="2000" shun="false"
      view_bundling="true" view_ack_collection_timeout="5000"/>
      <FC max_credits="20000000" min_threshold="0.10"/>
      <FRAG2 frag_size="60000"/>
      <pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/>
      <!-- <pbcast.STATE_TRANSFER/> -->
      <pbcast.FLUSH timeout="0"/>



      <!--
      Whether or not to fetch state on joining a cluster
      NOTE this used to be called FetchStateOnStartup and has been renamed to be more descriptive.
      -->
      true

      <!--
      The max amount of time (in milliseconds) we wait until the
      state (ie. the contents of the cache) are retrieved from
      existing members in a clustered environment
      -->
      20000

      <!--
      Number of milliseconds to wait until all responses for a
      synchronous call have been received.
      -->
      20000

      <!-- Max number of milliseconds to wait for a lock acquisition -->
      15000

      <!--
      Indicate whether to use region based marshalling or not. Set this to true if you are running under a scoped
      class loader, e.g., inside an application server. Default is "false".
      -->
      false




      <!-- Uncomment to get a graphical view of the TreeCache MBean above -->
      <!-- -->
      <!-- jboss.cache:service=TreeCache-->
      <!-- jboss.cache:service=TreeCache-->
      <!-- -->




      Last few lines of pojocache-aop.xml:



      <!-- If a POJO has PojoCachable annotation, it will be asepectized. -->
      <!--
      Supports inheritance and polymorphism. It can either be a concrete class
      or an interface. All sub-classes or interface implementors will be instrumeneted.
      -->