1 2 Previous Next 19 Replies Latest reply on Aug 21, 2013 11:15 AM by pferraro Go to original post
      • 15. Re: Listening to membership changes in AS7
        hwangarias

        Hello Paul and everyone,

         

        I'm trying to implement an rpc call in the cluster with the following code:

         

            public void sendMessage() {
                RpcManager rpcManager = oipCacheManager.getCache().getAdvancedCache().getRpcManager();
        
                // rpcManager.broadcastRpcCommand(new TestCommand(), true);
        
                JGroupsTransport transport = (JGroupsTransport) oipCacheManager.getTransport();
                Channel channel = transport.getChannel();
                log.debug("CHannel {}, isconnected:{}, clusterName:{}", channel, channel.isConnected(), channel.getClusterName());
                channel.setUpHandler(new MuxUpHandler());
                RpcDispatcher dispatcher = new org.jgroups.blocks.mux.MuxRpcDispatcher((short) 666, channel, null, null, this);
                dispatcher.start();
                log.debug("dispatcher {}", dispatcher);
        
                try {
                    RequestOptions opts = new RequestOptions();
        
                    log.debug("1- Sending RPC to kaka to all");
                    MethodCall call1 = new MethodCall(MethodCall.findMethod(HAService.class, "kaka", null));
                    RspList response = dispatcher.callRemoteMethods(null, call1, opts);
                    log.debug(" response -> {}", response);
        
                    log.debug("2- Sending RPC to kaka to all");
                    MethodCall call2 = new MethodCall(MethodCall.findMethod(HAService.class, "kaka2", null));
                    RspList response2 = dispatcher.callRemoteMethods(null, call2, opts);
                    log.debug(" responses -> {}", response2);
        
                    log.debug("3- Sending RPC to kaka to all");
                    MethodCall call3 = new MethodCall(MethodCall.findMethod(HAService.class, "kaka2", null));
                    RspList response3 = dispatcher.callRemoteMethods(null, call3, opts);
                    log.debug(" responses -> {}", response3);
        
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        
            public static String kaka() {
                System.out.println("oooooooooooooooooooooooooooooo KAKAKAKA");
                return "kaka";
            }
        
            public String kaka2() {
                System.out.println("oooooooooooooooooooooooooooooo KAKAKAKA2");
                return "kaka2";
            }
        

         

         

        I have set the jgroups logs to TRACE, and I get for each request:

        17:37:25,171 DEBUG [oip.framework.core.ejb.ha.HAService] (EJB default - 1) CHannel org.jboss.as.clustering.jgroups.MuxChannel@18d25e1, isconnected:true
        17:37:25,171 DEBUG [oip.framework.core.ejb.ha.HAService] (EJB default - 1) dispatcher org.jgroups.blocks.mux.MuxRpcDispatcher@1363ecf
        17:37:25,171 DEBUG [oip.framework.core.ejb.ha.HAService] (EJB default - 1) 1- Sending RPC to kaka to all
        17:37:25,172 DEBUG [oip.framework.core.ejb.ha.HAService] (EJB default - 1)  response -> [sender=node2/oip, received=false, suspected=false]
        
        17:37:25,172 DEBUG [oip.framework.core.ejb.ha.HAService] (EJB default - 1) 2- Sending RPC to kaka to all
        17:37:25,174 DEBUG [oip.framework.core.ejb.ha.HAService] (EJB default - 1)  responses -> [sender=node2/oip, received=false, suspected=false]
        
        17:37:25,174 DEBUG [oip.framework.core.ejb.ha.HAService] (EJB default - 1) 3- Sending RPC to kaka to all
        17:37:25,175 DEBUG [oip.framework.core.ejb.ha.HAService] (EJB default - 1)  responses -> [sender=node2/oip, received=false, suspected=false]
        

        Of course, in the other node I don't receive the call

         

        Any ideas of what's happening? Give me some light please...

         

        Thank you!

        Juan Arias

        • 16. Re: Listening to membership changes in AS7
          hwangarias

          Well, I have managed to make it work for simple cases in my JBoss 7 cluster with this (I want to do is to cast a generic rpc invocation on a jndi service):

          @Singleton
          @ApplicationScoped
          @Startup
          // @Startup Moved to resources/META-INF/ejb-jar.xml
          @Listener
          public class HAService {
              ...
              @PostConstruct
              public void postConstruct() {
                  // Get the jgroups channel from infinispan cacheManager
                  JGroupsTransport transport = (JGroupsTransport) oipCacheManager.getTransport();
                  channel = transport.getChannel();
          
                  // Mount a mux rpc dispatcher using such channel on this object
                  dispatcher = new MuxRpcDispatcher((short) 1, channel, null, null, this);
                  dispatcher.setMethodLookup(new MethodLookup() {
                      @Override
                      public Method findMethod(short id) {
                          try {
                              System.out.println("------>>> findmethod " + ServiceCall.class);
                              return HAService.this.getClass().getMethod("jndiDelegate", ServiceCall.class);
                          } catch (Exception e) {
                              e.printStackTrace();
                              return null;
                          }
                      }
                  });
          
                  // Start the dispatcher
                  dispatcher.start();
              }
              @PreDestroy
              public void preDestroy() {
                  // Stop the dispatcher
                  dispatcher.stop();
              }
          
              public List callRemoteMethods(ServiceCall serviceCall) {
                  try {
                      RequestOptions opts = new RequestOptions(ResponseMode.GET_ALL, 5000);
          
                      log.debug("Sending rpc call to all nodes");
                      MethodCall methodCall = new MethodCall((short) 1);
                      methodCall.setArgs(serviceCall);
                      System.out.println("------ before callremote " + ServiceCall.class);
                      RspList response = dispatcher.callRemoteMethods(null, methodCall, opts);
                      System.out.println("------ after callremote " + ServiceCall.class);
                      log.debug(" responses -> {}", response);
          
                      return response.getResults();
                  } catch (Exception ex) {
                      ex.printStackTrace();
                      return new ArrayList(0);
                  }
          
              }
          
              public Object jndiDelegate(ServiceCall serviceCall) {
                  try {
                      Object service = JndiUtils.lookup(serviceCall.getJndiName());
                      Class[] parameterTypes = new Class[serviceCall.getArgs().length];
                      for (int i = 0; i < serviceCall.getArgs().length; i++) {
                          parameterTypes[i] = serviceCall.getArgs().getClass();
                      }
          
                      Method serviceMethod = service.getClass().getMethod(serviceCall.getMethod(), parameterTypes);
                      return serviceMethod.invoke(service, (Object[]) serviceCall.getArgs());
                  } catch (Exception e) {
                      e.printStackTrace();
                      return null;
                  }
              }
          }
          
          

           

          If the remote method has primitive arguments, no proble, everything works ok

           

          But... when I try to serialize classes (this is the case as ServiceCall is a simple serializable wrapper) or invoke without the methodlookup I always receive:

           

          09:06:25,161 DEBUG [oip.framework.core.ejb.ha.HAService] (EJB default - 5) Sending rpc call to all nodes

          09:06:25,161 INFO  [stdout] (EJB default - 5) ------ before callremote class oip.framework.core.ejb.ha.ServiceCall

          09:06:25,165 INFO  [stdout] (EJB default - 5) ------ after callremote class oip.framework.core.ejb.ha.ServiceCall

          09:06:25,166 DEBUG [oip.framework.core.ejb.ha.HAService] (EJB default - 5)  responses -> [sender=node2/oip, exception=java.lang.IllegalArgumentException: java.lang.ClassNotFoundException: oip.framework.core.ejb.ha.ServiceCall from [Module "org.jgroups:main" from local module loader @c97b99 (finder: local module finder @c985b7 (roots: O:\oip\jboss\modules,O:\oip\jboss\modules\system\layers\base))], received=true, suspected=false]

           

          It seems the remote handler has some kind of issue with jboss classloaders and have no visibility to my module???

           

          Greetings

          Juan Arias

          • 17. Re: Listening to membership changes in AS7
            belaban

            Note that on JBoss 7 (*if* you have JGroups 3.4.0.Alpha2 or later), you can simply reuse the channel exposed by Infinispan/JBossAS: [1]. You could then create the RpcDispatcher on top of the ForkChannel.

             

            Same as the MuxRpcDispatcher, but you can also add protocols if you want to, dynamically.

             

            [1] http://belaban.blogspot.ch/2013/08/how-to-hijack-jgroups-channel-inside.html

            • 18. Re: Listening to membership changes in AS7
              hwangarias

              Thank you bela,

              yes it's interesting and I see the advantages

               

              Anyway, I have finished my testing and resolved my own issue... so in order someone else might find this interesting, these are my conclusions:

               

              To use JGroups in Jboss you can:

               

              It does not matter which way you follow, you will advertise that, in jboss, when you use the RpcDispatcher using such channel the classloader used to deserialize the content is the jgroups one, so you'll get ClassNotFoundException's, or even using MethodCall you won't be able to locate the method you want to call (in case of rpc)

               

              The final solution is install a custom Marshaller in the dispatcher to allow specify the classloader we want to use in target node, as the guys of teiid do:

              http://grepcode.com/file/repository.jboss.org/nexus/content/repositories/releases/org.jboss.teiid/teiid-runtime/8.2.0.Final/org/teiid/replication/jgroups/JGroupsObjectReplicator.java#JGroupsObjectReplicator.ContextAwareMarshaller

               

              With all this, you can implement your own RPC calls among JBoss 7 cluster nodes

               

              Greetings and thanks to Paul an Bela

              Juan Arias

              • 19. Re: Listening to membership changes in AS7
                pferraro

                Unfortunately, the RpcDispatcher in JGroups doesn't play well with AS7/Wildfly's modular class loader, hence the CNFE.

                To get this to work you'll have to override the marshaller used by the RequestCorrelator.  Something like:

                 

                {code}

                        dispatcher = new RpcDispatcher(...) {

                            @Override

                            protected RequestCorrelator createRequestCorrelator(Protocol transport, RequestHandler handler, Address localAddr) {

                                RequestCorrelator correlator = super.createRequestCorrelator(transport, handler, localAddr);

                                correlator.setMarshaller(...);

                                return correlator;

                            }

                        };

                {code}

                 

                The marshaller's serialization and deserialization methods will need to resolve classes using AS7's ModuleLoader.

                 

                In Wildfly I've introduced a new CommandDispatcher to greatly simplify the process of performing cluster wide operations.  It is similar to the RpcDispatcher in JGroups, but uses a command pattern rather than reflection, and plays nice with your deployment's classloader.

                https://github.com/wildfly/wildfly/tree/master/clustering/api/src/main/java/org/wildfly/clustering/dispatcher

                 

                Here's some sample usage:

                 

                {code}

                @Resource(lookup = "java:jboss/clustering/dispatcher/mycluster")

                private CommandDispatcherFactory factory;

                private CommandDispatcher<C> dispatcher;

                 

                @PostConstruct

                public void init() {

                    ServiceName name = ...;

                    C context = ...;

                    this.dispatcher = this.factory.createCommandDispatcher(name, context);

                }

                 

                public void execute() {

                   Command<R, C> command = ...;

                   Map<Node, CommandResponse<R>> responses = this.dispatcher.executeOnCluster(command);

                   // process responses

                }

                 

                @PreDestroy

                public void destroy() {

                    this.dispatcher.close();

                }

                {code}

                 

                To use, your application will need to add a single dependency on the wildfly-clustering-api module.

                The command dispatcher will not interfere with other users of the underlying channel (e.g. Infinispan, other dispatchers, etc.).

                1 2 Previous Next