1 2 Previous Next 16 Replies Latest reply on Jan 21, 2017 10:51 PM by seto

    Question about distributed execution thread.

    seto

      1. Use DefaultExecutorService for distributed execution.

      DefaultExecutorService des = new DefaultExecutorService(idCache);
      for (int i = 0; i < 5; i++) {
        des.submitEverywhere(new TestCallable());
      }
      

      In the node who submit everywhere, the callable is run one by one in order in the same thread. I think it's not as expected.

      callable Thread[pool-6-thread-1,5,main]

      Cache 'id-cache'@DESKTOP-7SRGBKP-9787

      id-cache

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      callable Thread[pool-6-thread-1,5,main]

      Cache 'id-cache'@DESKTOP-7SRGBKP-9787

      id-cache

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      callable Thread[pool-6-thread-1,5,main]

      Cache 'id-cache'@DESKTOP-7SRGBKP-9787

      id-cache

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      callable Thread[pool-6-thread-1,5,main]

      Cache 'id-cache'@DESKTOP-7SRGBKP-9787

      id-cache

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      callable Thread[pool-6-thread-1,5,main]

      Cache 'id-cache'@DESKTOP-7SRGBKP-9787

      id-cache

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      In the node who accept the callable, the callable is run concurrently in multiple threads. It's expected.

      callable Thread[remote-thread--p2-t5,5,main]

      Cache 'id-cache'@DESKTOP-7SRGBKP-31706

      id-cache

      callable Thread[remote-thread--p2-t2,5,main]

      Cache 'id-cache'@DESKTOP-7SRGBKP-31706

      id-cache

      callable Thread[remote-thread--p2-t3,5,main]

      Cache 'id-cache'@DESKTOP-7SRGBKP-31706

      id-cache

      callable Thread[remote-thread--p2-t1,5,main]

      Cache 'id-cache'@DESKTOP-7SRGBKP-31706

      id-cache

      callable Thread[remote-thread--p2-t4,5,main]

      Cache 'id-cache'@DESKTOP-7SRGBKP-31706

      id-cache

      ccc,eee

      bbb,bbb

      eee,[eee]

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      EMPTY

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      ccc,eee

      bbb,bbb

      eee,[eee]

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      EMPTY

      2. Use ClusterExecutor for distributed execution.

      for (int i = 0; i < 5; i++) {
         idCache.getCacheManager().executor().submitConsumer(localManager -> {
        System.out.println("runnable " + Thread.currentThread());
         output(localManager.getCache("id-cache"));
         System.out.println(((TestData) localManager.getCache("id-cache").get("ccc")).getState());
         return null;
         }, null);
      }
      

      Both is expected.

      In the node who submit everywhere, the callable is run concurrently in multiple threads. It seems that it's expected. But there's two remote-thread--p2-t1.

      runnable Thread[remote-thread--p2-t1,5,main]

      id-cache

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      runnable Thread[remote-thread--p2-t1,5,main]

      id-cache

      runnable Thread[remote-thread--p2-t2,5,main]

      id-cache

      runnable Thread[remote-thread--p2-t3,5,main]

      id-cache

      runnable Thread[remote-thread--p2-t4,5,main]

      id-cache

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      In the node who accept the callable, the callable is run concurrently in multiple threads. It's expected.

      runnable Thread[remote-thread--p2-t1,5,main]

      id-cache

      runnable Thread[remote-thread--p2-t8,5,main]

      runnable Thread[remote-thread--p2-t9,5,main]

      id-cache

      runnable Thread[remote-thread--p2-t6,5,main]

      id-cache

      runnable Thread[remote-thread--p2-t7,5,main]

      id-cache

      id-cache

      ccc,eee

      bbb,bbb

      eee,[eee]

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      EMPTY

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

      ccc,eee

      bbb,bbb

      eee,[eee]

      EMPTY

        • 1. Re: Question about distributed execution thread.
          seto

          GitHub - SetoKaiba/DistributedExecutionCDI

          There's a reproducer project here. And it contains the reproducer for Question about distributed execution. as well.

          • 2. Re: Question about distributed execution thread.
            william.burns

            Hello Seto,

             

            By default the DefaultExecutorService uses a single thread pool when using the single arg constructor.  If you check out the API it allows you to provide your own thread pool when creating the default executor.  If you wanted though you can pass in the remote thread ExecutorService retrieved from the cache so it will use the same named thread pool on each node.

             

            DefaultExecutorService des = new DefaultExecutorService(testCache, testCache.getAdvancedCache().getComponentRegistry().getComponent(ExecutorService.class, KnownComponentNames.REMOTE_COMMAND_EXECUTOR);
            
            • 3. Re: Question about distributed execution thread.
              seto

              Thank you for clarifying. I did notice that there's parameter for specifying the ExecutorService.

              But I thought that the default one will be treated the same as other nodes to use remote executor. Because it implements and called DistributedExecutorService. The DefaultExecutorService do get a thread from the pool. it does not using the main thread. But it will only retrive the same thread.

              That's why I'm thinking it as a bug.

              • 4. Re: Question about distributed execution thread.
                seto

                And also is there a way to specify the thread count for remote command executor?

                • 5. Re: Question about distributed execution thread.
                  william.burns

                  You can specify the thread executor parameters in xml or through Java depending on how you configure it currently.

                   

                  If you are using xml you have to create a named thread factory and then associate it with the container.  An example can be found at https://github.com/infinispan/infinispan/blob/master/core/src/test/resources/configs/named-cache-test.xml#L30

                  infinispan/named-cache-test.xml at master · infinispan/infinispan · GitHub

                   

                  If you are using Java configuration you can use the configuration builders for the transport at infinispan/TransportConfigurationBuilder.java at master · infinispan/infinispan · GitHub

                  • 6. Re: Question about distributed execution thread.
                    seto

                    Thank you. Do you mean create a thread pool factory instead of thread factory?

                    There's threadFactory(ThreadFactory threadFactory) and threadPoolFactory(ThreadPoolExecutorFactory threadPoolFactory) in the ThreadPoolConfigurationBuilder.

                    I think you're mentioning the threadPoolFactory.

                    • 7. Re: Question about distributed execution thread.
                      sebastian.laskawiec

                      I think Will answered all your doubts Seto!

                       

                      Just a side note, yes, not injecting Cache into Runnables in CDI is a bug. I filled JIRA for this: ISPN-7363. Perhaps you'd like to contribute to the Infinispan code base and fix it? It's seems pretty easy.

                      • 8. Re: Question about distributed execution thread.
                        seto

                        Hi. Maybe it's not easy to fix for Runnable.

                        https://github.com/infinispan/infinispan/blob/c1ba79865a756d02a622979fb0480939a46cdbdf/core/src/main/java/org/infinispan/distexec/DefaultExecutorService.java#L1103-L1124

                        https://github.com/infinispan/infinispan/blob/a535b9623504bc5ec88584130065c89f1f9f357e/cdi/embedded/src/main/java/org/infinispan/cdi/embedded/DelegatingDistributedTaskLifecycle.java#L42-L73

                        The RunnableAdapter is private.

                        I don't have an environment setup for Infinispan development. So I check the code directly by searching in github.

                        And I found that the CDI context is injected in the second link. But for the Runnable is wraped in the RunnableAdapter implements Callable which is private.

                        • 9. Re: Question about distributed execution thread.
                          seto

                          William Burns wrote:

                           

                          Hello Seto,

                           

                          By default the DefaultExecutorService uses a single thread pool when using the single arg constructor. If you check out the API it allows you to provide your own thread pool when creating the default executor. If you wanted though you can pass in the remote thread ExecutorService retrieved from the cache so it will use the same named thread pool on each node.

                           

                          1. DefaultExecutorServicedes=newDefaultExecutorService(testCache,testCache.getAdvancedCache().getComponentRegistry().getComponent(ExecutorService.class,KnownComponentNames.REMOTE_COMMAND_EXECUTOR);

                          Hi. A small problem, remote command executor is shutdown when there's single node only. I will have single node only before. And there will be a second node a moment later.

                          How can I make it working with single node only and multiple nodes both?

                          Exception in thread "main" java.lang.IllegalArgumentException: Can not use an instance of ExecutorService which is shutdown

                          UPDATED: Because the remote command executor is a LazyInitializingBlockingTaskAwareExecutorService. I have to run an empty Runnable to init it.

                          Is this the only way and the best way?

                          ExecutorService executorService = idCache.getAdvancedCache().getComponentRegistry().getComponent(ExecutorService.class, KnownComponentNames.REMOTE_COMMAND_EXECUTOR);
                          executorService.execute(() -> {
                          });
                          des = new DefaultExecutorService(idCache, executorService);
                          
                          • 10. Re: Question about distributed execution thread.
                            william.burns

                            I just saw your update.  But before that I wrote a simple class that creates a dist cache and then immediately grabs the executor.

                             

                            public static void main(String[] args) {
                                  GlobalConfiguration globalConfig = new GlobalConfigurationBuilder().transport()
                                        .defaultTransport()
                                     .build();
                                  Configuration config = new ConfigurationBuilder()
                                        .clustering()
                                          .cacheMode(CacheMode.DIST_SYNC)
                                     .build();
                            
                                  EmbeddedCacheManager manager = new DefaultCacheManager(globalConfig, config);
                            
                                  Cache cache = manager.getCache("default");
                            
                                  DefaultExecutorService des = new DefaultExecutorService(cache,
                                        cache.getAdvancedCache().getComponentRegistry().getComponent(ExecutorService.class, KnownComponentNames.REMOTE_COMMAND_EXECUTOR));
                            
                                  des.submit(() -> System.out.println("Worked"));
                               }
                            
                            
                            
                            

                             

                            I wasn't able to reproduce the exception you are getting, but this may be because I am using master.  Are you using an older version?

                             

                            It seems like a bug to me that LazyInitializingBlockingTaskAwareExecutorService says it is shutdown when it has yet to be initialized.  You can log something for that, but in the mean time you have a workaround at least

                            • 11. Re: Question about distributed execution thread.
                              seto

                              Exception in thread "main" java.lang.IllegalArgumentException: Can not use an instance of ExecutorService which is shutdown

                              I tried your code. It still reports this.

                              Maybe it's a bug of 8.2.5.Final?

                              I take a look at the master branch.

                              infinispan/DefaultExecutorService.java at master · infinispan/infinispan · GitHub

                              It seems the if is still there.

                              infinispan/LazyInitializingBlockingTaskAwareExecutorService.java at master · infinispan/infinispan · GitHub

                              LazyInitializingBlockingTaskAwareExecutorService is not modified as well.

                               

                              Maybe somewhere do the init trick? Just like what I do, submit an empty task to init.

                              • 12. Re: Question about distributed execution thread.
                                seto

                                I tried the 9.0.0.Beta1. I can't even bootstrap using your code.

                                2017-01-19 14:20:01,471 [main] WARN org.infinispan.factories.GlobalComponentRegistry - ISPN000189: While stopping a cache or cache manager, one of its components failed to stop

                                Exception in thread "main" org.infinispan.manager.EmbeddedCacheManagerStartupException: org.infinispan.commons.CacheConfigurationException: ISPN000085: Error while trying to create a channel using the specified configuration file: default-configs/default-jgroups-udp.xml

                                org.infinispan.commons.CacheException: Unable to invoke method public void org.infinispan.topology.LocalTopologyManagerImpl.stop() on object of type LocalTopologyManagerImpl

                                  at org.infinispan.factories.GlobalComponentRegistry.start(GlobalComponentRegistry.java:252)

                                  at org.infinispan.commons.util.ReflectionUtil.invokeAccessibly(ReflectionUtil.java:172) ~[infinispan-embedded-9.0.0.Beta1.jar:9.0.0.Beta1]

                                  at org.infinispan.manager.DefaultCacheManager.start(DefaultCacheManager.java:667)

                                  at org.infinispan.factories.AbstractComponentRegistry$PrioritizedMethod.invoke(AbstractComponentRegistry.java:867) ~[infinispan-embedded-9.0.0.Beta1.jar:9.0.0.Beta1]

                                  at org.infinispan.manager.DefaultCacheManager.<init>(DefaultCacheManager.java:235)

                                  at org.infinispan.factories.AbstractComponentRegistry.internalStop(AbstractComponentRegistry.java:669) [infinispan-embedded-9.0.0.Beta1.jar:9.0.0.Beta1]

                                  at org.infinispan.factories.AbstractComponentRegistry.stop(AbstractComponentRegistry.java:569) [infinispan-embedded-9.0.0.Beta1.jar:9.0.0.Beta1]

                                  at org.infinispan.manager.DefaultCacheManager.<init>(DefaultCacheManager.java:212)

                                  at org.infinispan.factories.GlobalComponentRegistry.start(GlobalComponentRegistry.java:256) [infinispan-embedded-9.0.0.Beta1.jar:9.0.0.Beta1]

                                  at co.kaiba.blueeyes.impl.kernel.Kernel.main(Kernel.java:62)

                                  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

                                  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

                                  at org.infinispan.manager.DefaultCacheManager.start(DefaultCacheManager.java:667) [infinispan-embedded-9.0.0.Beta1.jar:9.0.0.Beta1]

                                  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

                                  at java.lang.reflect.Method.invoke(Method.java:498)

                                  at org.infinispan.manager.DefaultCacheManager.<init>(DefaultCacheManager.java:235) [infinispan-embedded-9.0.0.Beta1.jar:9.0.0.Beta1]

                                  at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

                                  at org.infinispan.manager.DefaultCacheManager.<init>(DefaultCacheManager.java:212) [infinispan-embedded-9.0.0.Beta1.jar:9.0.0.Beta1]

                                Caused by: org.infinispan.commons.CacheConfigurationException: ISPN000085: Error while trying to create a channel using the specified configuration file: default-configs/default-jgroups-udp.xml

                                  at co.kaiba.blueeyes.impl.kernel.Kernel.main(Kernel.java:62) [main/:?]

                                  at org.infinispan.remoting.transport.jgroups.JGroupsTransport.buildChannel(JGroupsTransport.java:449)

                                  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]

                                  at org.infinispan.remoting.transport.jgroups.JGroupsTransport.initChannel(JGroupsTransport.java:326)

                                  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]

                                  at org.infinispan.remoting.transport.jgroups.JGroupsTransport.initChannelAndRPCDispatcher(JGroupsTransport.java:367)

                                  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]

                                  at org.infinispan.remoting.transport.jgroups.JGroupsTransport.start(JGroupsTransport.java:201)

                                  at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]

                                  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

                                  at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147) [idea_rt.jar:?]

                                  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

                                Caused by: java.lang.NullPointerException

                                  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

                                  at java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) ~[?:1.8.0_112]

                                  at java.lang.reflect.Method.invoke(Method.java:498)

                                  at org.infinispan.topology.PersistentUUIDManagerImpl.removePersistentAddressMapping(PersistentUUIDManagerImpl.java:42) ~[infinispan-embedded-9.0.0.Beta1.jar:9.0.0.Beta1]

                                  at org.infinispan.commons.util.ReflectionUtil.invokeAccessibly(ReflectionUtil.java:168)

                                  at org.infinispan.topology.LocalTopologyManagerImpl.stop(LocalTopologyManagerImpl.java:119) ~[infinispan-embedded-9.0.0.Beta1.jar:9.0.0.Beta1]

                                  at org.infinispan.factories.AbstractComponentRegistry$PrioritizedMethod.invoke(AbstractComponentRegistry.java:867)

                                  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]

                                  at org.infinispan.factories.AbstractComponentRegistry.invokeStartMethods(AbstractComponentRegistry.java:633)

                                  at org.infinispan.factories.AbstractComponentRegistry.internalStart(AbstractComponentRegistry.java:622)

                                  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]

                                  at org.infinispan.factories.AbstractComponentRegistry.start(AbstractComponentRegistry.java:547)

                                  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]

                                  at org.infinispan.factories.GlobalComponentRegistry.start(GlobalComponentRegistry.java:239)

                                  at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]

                                  ... 9 more

                                  at org.infinispan.commons.util.ReflectionUtil.invokeAccessibly(ReflectionUtil.java:168) ~[infinispan-embedded-9.0.0.Beta1.jar:9.0.0.Beta1]

                                Caused by: java.lang.Exception: found IPv4 multicast address /228.6.7.8 in an IPv6 stack

                                  ... 13 more

                                  at org.jgroups.stack.Configurator.setupProtocolStack(Configurator.java:103)

                                  at org.jgroups.stack.Configurator.setupProtocolStack(Configurator.java:49)

                                  at org.jgroups.stack.ProtocolStack.setup(ProtocolStack.java:474)

                                  at org.jgroups.JChannel.init(JChannel.java:965)

                                  at org.jgroups.JChannel.<init>(JChannel.java:148)

                                  at org.jgroups.JChannel.<init>(JChannel.java:122)

                                  at org.infinispan.remoting.transport.jgroups.JGroupsTransport.buildChannel(JGroupsTransport.java:447)

                                  ... 22 more

                                • 13. Re: Question about distributed execution thread.
                                  seto

                                  I want to set a breakpoint at initIfNeeded to check who init it.

                                  But there's no luck. I can't run 9.0.0.Beta1.

                                  Did you test your code with single node only?

                                  • 14. Re: Question about distributed execution thread.
                                    seto

                                    I have to use -Djava.net.preferIPv4Stack=true to run in 9.0.0.Beta1.

                                    But I don't need it in 8.2.5.Final.

                                    Is it a bug?

                                     

                                    infinispan/CustomRequestCorrelator.java at master · infinispan/infinispan · GitHub

                                    This one will call the remote executor. That's why it's working in 9.0.0.Beta1.

                                    1 2 Previous Next