4 Replies Latest reply on May 25, 2013 1:36 PM by defunkt

    Problem with RPCManager members synchronization

    defunkt Newbie



      I'm using the distributed execution framework to distribute tasks across a cluster. Sometimes there occurs a situtation where I send a task to a worker which was 'just' registered to my list of workers by a view changed event. In such an occurance, I usually get an error that the node I tried sending the task to is not yet available.


      Looking at the code of DefaultExecutorService, I see that its getMembers() method, which is used to find out which members are available to send tasks to, uses RPCManager.getMembers().

      However, it seems that RPCManager.getTransport().getMembers() will display the updated view some time before RPCManager.getMembers() will. This is the result I get from a test I wrote to check this issue:


      2013-05-24 23:05:10,621 INFO  [JGroupsTransport] (main) ISPN000078: Starting JGroups Channel

      2013-05-24 23:05:26,588 INFO  [JGroupsTransport] (main) ISPN000094: Received new cluster view: [node1-52269|0] [node1-52269]

      2013-05-24 23:05:26,594 INFO  [JGroupsTransport] (main) ISPN000079: Cache local address is node1-52269, physical addresses are [X]

      LISTENER: after ctor

      2013-05-24 23:08:05,744 INFO  [JGroupsTransport] (Incoming-1,node1-52269) ISPN000094: Received new cluster view: [node1-52269|1] [node1-52269, node1-6039]

      LISTENER: submitting task to address node1-6039

      LISTENER: current members from event are: [node1-52269, node1-6039]

      Members directly from RPC: [node1-52269]

      Members from RPC.transport: [node1-52269, node1-6039]

      2013-05-24 23:08:05,837 WARN  [CacheManagerNotifierImpl] (notification-thread-0,node1) ISPN000134: Unable to invoke method public void test.exec.ViewChangedTest$MembersListener.handleViewChanged(org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent) on Object instance test.exec.ViewChangedTest$MembersListener@19c4c8e - removing this target object from list of listeners!

      java.lang.IllegalArgumentException: Target node node1-6039 is not a cluster member, members are [node1-52269]

                at org.infinispan.distexec.DefaultExecutorService.submit(DefaultExecutorService.java:453)

                at org.infinispan.distexec.DefaultExecutorService.submit(DefaultExecutorService.java:442)

                at test.exec.ViewChangedTest$MembersListener.handleViewChanged(ViewChangedTest.java:62)

                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

                at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)

                at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)

                at java.lang.reflect.Method.invoke(Unknown Source)

                at org.infinispan.notifications.AbstractListenerImpl$ListenerInvocation$1.run(AbstractListenerImpl.java:212)

                at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

                at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

                at java.lang.Thread.run(Unknown Source)


      Is this intended behaviour?


      I ran this test on Infinispan 5.2.1final with only core jars needed. attached are the test code (viewChangedTest sets up the listener and Member simply sets up a new member in the cluster) and config files.



      Gilad Ber

        • 1. Re: Problem with RPCManager members synchronization
          Pedro Ruivo Apprentice

          Hi Gilad,


          yes, that is the expected behavior.


          Infinispan supports heterogeneous caches (I don't know if this is the correct name). This means that each machine in the cluster can have different caches. Example:


          you can have a cluster with 3 machines: N1, N2 and N3.

          in N1, you can have the cache A

          in N2 and N3 you can have the cache B


          In both caches (A and B), the Transport.getMembers() will return exactly the same list:


          rpcManager.getTransport().getMembers(); //return a list with N1, N2 and N3.


          On other hand, the RpcManager.getMembers() returns different things in different caches:


          //in cache A
          rpcManager.getMembers(); //returns a list with N1 only!
          //in cache B
          rpcManager.getMembers(); //returns a list with N2 and N3.


          Assuming you have a new node join, say N4, JGroups will trigger a ViewEvent with the members list (after this point, Transport.getMembers() will return N1, N2, N3 and N4). At N4, when it has the new view it contacts the coordinator to join to cache A (or B, or both, or a new cache, but let's assume A for this example). After this step, N4 joins and may start the state transfer (only after this point, the RpcManager.getMembers() on cache A will return N1 and N4).


          I hope I was clear enough


          In your case, if you want to send some work to the join node, you have to listen to DataRehash events (they are cache specific). You should anotate a method in your Listener with @DataRehashed and register it in the cache:


          cache.addListener(/*your listener*/);




          • 2. Re: Problem with RPCManager members synchronization
            defunkt Newbie

            Hi Pedro,


            Good explanation, now I understand why RPCManager.getMembers is used there. I was thinking though, (and I will check this later) would a data rehash event occur even when there is still no data in the cache?


            Thanks for your help,

            Gilad Ber

            • 3. Re: Problem with RPCManager members synchronization
              Pedro Ruivo Apprentice

              Hi Gilad,


              You're right. The data rehash event is only triggered when we have state transfer configured. Use instead the topology changed event (@TopologyChanged). I've checked and it is triggered independent if you have state transfer configured or not.


              Sorry for my mistake




              1 of 1 people found this helpful
              • 4. Re: Problem with RPCManager members synchronization
                defunkt Newbie

                Great,  that's exactly what I was looking for! :-)