12 Replies Latest reply on Jan 21, 2017 10:54 PM by seto

    Question about distributed execution.

    seto

      There's two approach of distributed execution in Infinispan.

      One is the ClusterExecutor. The other is DefaultExecutorService.

      It seems that only the latter supports CDI.

      But it seems that the latter is lack of the ability of filtering for submit everywhere.

      Or do I miss something?

        • 1. Re: Question about distributed execution.
          sebastian.laskawiec

          That is correct. CDI doesn't support ClusterExecutor (I created ISPN-7357 to implement this).

           

          However if you want to use ClusterExecutor, just use:

          @Inject

          EmbeddedCacheManager ecm;

          ...

          ecm.executor().submit(myRunnable);

          • 2. Re: Question about distributed execution.
            seto

            What about the ability to filtering nodes like ClusterExecutor does?

            • 3. Re: Question about distributed execution.
              seto

              Sebastian Łaskawiec wrote:

               

              That is correct. CDI doesn't support ClusterExecutor (I created ISPN-7357 to implement this).

               

              However if you want to use ClusterExecutor, just use:

              @Inject

              EmbeddedCacheManager ecm;

              ...

              ecm.executor().submit(myRunnable);

              Not exactly. Maybe you misunderstand me. What I mean is the runnable below is not working with CDI.

              private static class TestRunnable implements Runnable, Serializable {
                 @Inject
                @IdCache
                 Cache<String, Object> idCache;
              
                 @Override
                 public void run() {
                System.out.println("runnable");
                 System.out.println(idCache);
                 output(idCache);
                 System.out.println(((TestData) idCache.get("ccc")).getState());
                 }
              }
              

              Here's the way you mention.

              @Inject
              EmbeddedCacheManager ecm;
              

              Both below will not work with CDI. The idCache will be null.

              idCache.getCacheManager().executor().submit(new TestRunnable());
              ecm.executor().submit(new TestRunnable());
              

              Instead I have to use SerializableFunction with EmbeddedCacheManager to get the idCache like below.

              ecm.executor().submitConsumer(localManager -> {
                 output(localManager.getCache("id-cache"));
                 System.out.println(((TestData) localManager.getCache("id-cache").get("ccc")).getState());
                return null;
              }, null);
              
              • 4. Re: Question about distributed execution.
                sebastian.laskawiec

                May I ask you to create a small reproducer project and put it on Github? I would like to have a look into this issue.

                • 5. Re: Question about distributed execution.
                  seto

                  GitHub - SetoKaiba/DistributedExecutionCDI

                  Hi. Here it is.

                  Just run Test or org.jboss.weld.environment.se.StartMain

                   

                  There's comments mentioning when will CDI work.

                  • 6. Re: Question about distributed execution.
                    seto

                    Question about distributed execution thread.

                    Could you please have a look at the thread issue for DefaultExecutorService.

                    The node who calls submitEverywhere will run one by one in the same thread.

                    Only the other nodes will run in multiple threads concurrently.

                    But there is no issue with ClusterExecutor.

                    The reproducer is in the github project as well.

                    • 7. Re: Question about distributed execution.
                      william.burns

                      Hello Seto,

                       

                      To answer your question about filtering for DistributedExecutor versus ClusterExecutor it is important to understand the purpose of each.

                       

                      The DistributedExecutor is for executing code across the cluster tied to specific data.  Note how you can supply a given set of keys to it when you call it.  This will automatically filter which nodes run the command as only nodes that primarily own that data will run the given command.  Note this is why each DistributedExecutor is available on each cache independently.

                       

                      The ClusterExecutor is a brand new API that is designed solely for executing code on specific nodes.  Thus why it specifically has API for filtering at the node level.  It doesn't provide the ability to do anything with data itself.  Note how the ClusterExecutor is available directly from the CacheManager.  If the CacheManager is clustered it will automatically allow you to run commands clustered irrespective of caches.

                       

                      Thus if you want to run something not tied to data you should use the ClusterExecutor.  Also if you do want to run something with data in mind you should look at the CacheStream API.  Unfortunately Distributed Executor is kind of in an awkward spot where most things can be performed better with ClusterExecutor or CacheStream (distributed streams), but we aren't sure if we can remove it yet

                      • 8. Re: Question about distributed execution.
                        seto

                        I do know the difference between DistributedExecutorService and ClusterExecutor.

                        I thought that what I want is ClusterExecutor. But it does not support CDI yet.

                        And also I took a look at CacheStream before. It's more like MapReduce which is using to process data concurrently for similar data.

                        Maybe it's not the stuff I'm looking for.

                        I'm not sure whether I'm understanding correctly.

                         

                        And also, I'm using Infinispan as the Data Layer of server. But you maybe notice I'm using a cache call idCache.

                        And the stores the object. And the object may reference other objects by id, a reference class defined with Serializable for it.

                        So I maybe don't know the keys I would use in the beginning.

                        Instead, I want to choose nodes by load balancing myself or other stuffs I can judge myself.

                        That's why I'm asking for the filtering feature for DistributedExecutorService.

                        • 9. Re: Question about distributed execution.
                          seto

                          And also DistributedExecutorService do provide methods to run a specific node. But there's no way to run on specific nodes.

                           

                          <T> CompletableFuture<T>submit(Address target, Callable<T> task)

                          Submits the given Callable task for execution on the specified target Infinispan node.

                          <T> CompletableFuture<T>submit(Address target, DistributedTask<T> task)

                          Submits the given DistributedTask for execution on the specified target Infinispan node.

                          • 10. Re: Question about distributed execution.
                            seto

                            And also please don't remove DistributedExecutorService. The node choosing by keys feature is still what users want.

                            Anyway, I still want the feature to run on specific nodes with DistributedExecutorService.

                            • 11. Re: Question about distributed execution.
                              william.burns

                              Seto Kaiba wrote:

                               

                              And also please don't remove DistributedExecutorService. The node choosing by keys feature is still what users want.

                              Anyway, I still want the feature to run on specific nods with DistributedExecutorService.

                              This is the main benefit of ClusterExecutor.  Is there a reason you can't use it for this purpose?  I am just trying to understand how users want to use it.  Is it because it implements ExecutorService interface?

                               

                              Also there is nothing stopping you from calling ExecutorService.submit(Address, Callable) for each node.

                               

                              Also CacheStream is more than just map reduce, for example you can do distributed execution with at least once semantics on a specific set of keys with the following:

                               

                              cache.parallelStream().filterKeys(keys).forEach((cache, entry) -> 
                                 // Do your distribute code here
                                 cache.put(entry.getKey() + "-extra", entry.getValue() + "-value")
                              );
                              
                              • 12. Re: Question about distributed execution.
                                seto

                                GitHub - SetoKaiba/infinispan: Infinispan is an open source data grid platform and highly scalable NoSQL cloud data stor…

                                Hi. Here's maybe working fix I commit.

                                I imported the projects in IDEA but it seems that it can't solve the dependencies.

                                I can't test it. I make RunnableAdapter<T> public and judge the type during injection.

                                Maybe you can test whether it works or not.