5 Replies Latest reply on Apr 18, 2008 4:11 AM by jorgemoralespou_2

    jBPM concurrency

    jorgemoralespou_2

      I'm trying to execute multiple processes instances concurrently. I`m experiencing a locking behaviour. If I execute linearly 10000 process instances, it takes an average 0.4 ms. If I execute concurrently (in 10 threads) 1000 process instances, it takes from 1.5 to 25.0 ms average, depending on how loaded is the system.

      How can I avoid this synchronization/ concurrency locking. All my processes are thread safe, and I`m not using persistence at all.

        • 1. Re: jBPM concurrency
          jorgemoralespou_2

          I have tried PVM (in it`s today state) and results are like the following:
          100000 executions - 1 Thread


          Total exec time
          Total executions
          Min. exec. time (ms)
          Max. exec. time (ms)
          Avg. exec. time (ms)

          jBpm Run 13113810000001230,3114
          jBpm Run 23071510000001090,3072
          jBpm Run 32869610000001640,2870
          PVM Run 152201000000290,0522
          PVM Run 254691000000350,0547
          PVM Run 356711000000980,0567
          100000 executions - 10 Thread * 10000 exec/thread


          Total exec time
          Total executions
          Min. exec. time (ms)
          Max. exec. time (ms)
          Avg. exec. time (ms)

          jBpm Run 1278547100000020992,7855
          jBpm Run 217222210000007761,7222
          jBpm Run 324198910000009512,4199
          PVM Run 141865100000013800,4187
          PVM Run 239873100000012460,3987
          PVM Run 34827810000006170,4828
          100000 executions - 100 Thread * 1000 exec/thread


          Total exec time
          Total executions
          Min. exec. time (ms)
          Max. exec. time (ms)
          Avg. exec. time (ms)

          jBpm Run 111820441000000124711,8204
          jBpm Run 2157539210000001513915,7539
          jBpm Run 3214815010000002639321,4815
          PVM Run 1267664100000029392,6766
          PVM Run 217423010000001091,7423
          PVM Run 37270910000005900,7271


          This tests are without process persistance. A process with 20 empty nodes, and an action-handler/behaviour configured.

          As anyone can see, jBpm doesn`t scale with concurrent usage, but PVM seems to scale fairly well. Hope this stays the same.

          Only wish PVM would be in a more usable state.

          • 2. Re: jBPM concurrency
            jorgemoralespou_2

            I have been throughly reading the forums, reading through the code, and find that although orchestration is a feature??? , as theres is some other people using jBpm to orchestrate services , it doesn't seem to fit the suit.

            Has anyone use jBpm to do service orchestration, and having solved problems with scalability? Is it possible to modify jBpm, to use a subset of it's functionality to avoid locking? Is it possible to introduce some of PVM`s changes, or is it going to be jBpm 4 based on jPdl over PVM? In such a case, are things like concurrency taken into consideration?

            Don't break it, now that it seems to be working in PVM. ;-)

            • 3. Re: jBPM concurrency
              aguizar

              Jorge,

              Threading helps get the most of your system by allowing a job to do work while other job is blocked for I/O, or by leveraging multiple CPUs. Because your tests do little I/O then the bottleneck will be the CPU. Increasing the number of threads in such a situation will actually decrease performance due to the overhead associated with context switching.

              Your tests do not necessarily prove that jBPM does not scale. They might just mean that your CPU is exhausted due to an overuse of threads. To prove jBPM is the bottleneck, you'd have to show that the CPU is noticeably below 100% usage and that there is contention between threads.

              Of course, it'd be interesting to dig further into this. Did you use a profiling tool that generated finer-grained data? Are your tests available publicly?

              • 4. Re: jBPM concurrency
                jorgemoralespou_2

                Hi Alex,
                You could probably be right, if there were same threading and system behaviour between PVM and jBpm. But this not beeing the case, and having proved this situation in our production environments with a simple test (which I will post later this morning, when I have time) in an 8 dual core Itanium with 8Gb ram, the cpu was just laughing at me.

                • 5. Re: jBPM concurrency
                  jorgemoralespou_2

                  The code for jbpm

                  package test.jpdl.test;
                  
                  import java.io.IOException;
                  
                  import org.jbpm.graph.def.ProcessDefinition;
                  import org.jbpm.graph.exe.ProcessInstance;
                  
                  public class TestSomJpdl {
                   ProcessDefinition processDefinition = ProcessDefinition.parseXmlString(
                   "<process-definition>" +
                   " <start-state>" +
                   " <transition to='s' />" +
                   " </start-state>" +
                   " <node name='s'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s1'/>" +
                   " </node>" +
                   " <node name='s1'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s2'/>" +
                   " </node>" +
                   " <node name='s2'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s3'/>" +
                   " </node>" +
                   " <node name='s3'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s4'/>" +
                   " </node>" +
                   " <node name='s4'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s5'/>" +
                   " </node>" +
                   " <node name='s5'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s6'/>" +
                   " </node>" +
                   " <node name='s6'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s7'/>" +
                   " </node>" +
                   " <node name='s7'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s8'/>" +
                   " </node>" +
                   " <node name='s8'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s9'/>" +
                   " </node>" +
                   " <node name='s9'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s10'/>" +
                   " </node>" +
                   " <node name='s10'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s11'/>" +
                   " </node>" +
                   " <node name='s11'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s12'/>" +
                   " </node>" +
                   " <node name='s12'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s13'/>" +
                   " </node>" +
                   " <node name='s13'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s14'/>" +
                   " </node>" +
                   " <node name='s14'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s15'/>" +
                   " </node>" +
                   " <node name='s15'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s16'/>" +
                   " </node>" +
                   " <node name='s16'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s17'/>" +
                   " </node>" +
                   " <node name='s17'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s18'/>" +
                   " </node>" +
                   " <node name='s18'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s19'/>" +
                   " </node>" +
                   " <node name='s19'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='s20'/>" +
                   " </node>" +
                   " <node name='s20'>" +
                   " <action class='test.jpdl.test.MyActionHandler' config-type='field'>" +
                   " <mensaje>#{mensaje}</mensaje>"+
                   " </action>"+
                   " <transition to='end'/>" +
                   " </node>" +
                   " <end-state name='end' />" +
                   "</process-definition>"
                   );
                  
                   float globalMin = 0;
                   float globalMax = 0;
                   float globalSum = 0;
                   float globalNum = 0;
                  
                  
                  
                   private void performanceMultiThread(int nThreads, int nExecutions){
                  
                   for (int i=0;i<nThreads;i++){
                   new ProcessRunner(i, nExecutions);
                   }
                  
                   try {
                   System.in.read();
                   } catch (IOException e) {
                   e.printStackTrace();
                   }
                   System.out.println("Process stats:");
                   System.out.println("Total exec time: " + globalSum);
                   System.out.println("Total executions: " + globalNum);
                   System.out.println("Min. exec. time: " + globalMin);
                   System.out.println("Max. exec. time: " + globalMax);
                   System.out.println("Avg. exec. time: " + (globalSum/globalNum));
                  
                   }
                  
                   class ProcessRunner implements Runnable{
                   Thread t;
                  
                   int n;
                  
                   int execNumber;
                  
                   public ProcessRunner(int execNumber, int n) {
                   this.n = n;
                   this.execNumber = execNumber;
                   t = new Thread(this, "Hilo hijo");// Crea un nuevo hilo
                   t.start(); // Comienza el hilo
                   }
                  
                   public void run() {
                   performance(n);
                   }
                  
                   public void performance(int n){
                   float min = 0;
                   float max = 0;
                   float sum = 0;
                   float num = 0;
                   long timestamp = 0;
                   float time = 0;
                  
                  
                   // Initial
                   timestamp = System.currentTimeMillis();
                   ProcessInstance processInstance =
                   new ProcessInstance(processDefinition);
                   processInstance.getContextInstance().setTransientVariable("mensaje", "Prueba");
                   processInstance.signal();
                   System.out.println("Initial execution time: " + (System.currentTimeMillis() - timestamp));
                  
                   // Burst
                   for(int i=0; i<n; i++, num++){
                   timestamp = System.currentTimeMillis();
                   processInstance =
                   new ProcessInstance(processDefinition);
                   processInstance.getContextInstance().setTransientVariable("mensaje", "Prueba");
                   processInstance.signal();
                   time = System.currentTimeMillis() - timestamp;
                   time = System.currentTimeMillis() - timestamp;
                  
                   globalNum += 1;
                  
                   sum += time;
                   globalSum += time;
                   // min
                   min = (time<=min)?time:min;
                   globalMin = (time<=min)?time:min;
                   // max
                   max = (time>=max)?time:max;
                   globalMax = (time>=max)?time:max;
                   }
                   if (processInstance.hasEnded()){
                   System.out.println("[+"+execNumber+"]:Process stats:");
                   System.out.println("[+"+execNumber+"]:Total exec time: " + sum);
                   System.out.println("[+"+execNumber+"]:Total executions: " + num);
                   System.out.println("[+"+execNumber+"]:Min. exec. time: " + min);
                   System.out.println("[+"+execNumber+"]:Max. exec. time: " + max);
                   System.out.println("[+"+execNumber+"]:Avg. exec. time: " + (sum/num));
                   }else{
                   System.out.println("ProcessInstance has not ended");
                   }
                   }
                   }
                  
                   public static void main(String[] args) {
                   TestSomJpdl test = new TestSomJpdl();
                  // test.performanceMultiThread(1, 10000);
                  // test.performanceMultiThread(10, 1000);
                  // test.performanceMultiThread(100, 100);
                  
                  // test.performanceMultiThread(1, 100000);
                  // test.performanceMultiThread(10, 10000);
                   test.performanceMultiThread(100, 1000);
                   }
                  
                  }
                  


                  The code for PVM:
                  package com.hp.som.jpdl.test;
                  
                  import java.io.IOException;
                  import java.util.HashMap;
                  import java.util.Map;
                  
                  import org.jbpm.pvm.Activity;
                  import org.jbpm.pvm.Execution;
                  import org.jbpm.pvm.ProcessDefinition;
                  import org.jbpm.pvm.ProcessFactory;
                  
                  
                  public class TestSomPVM {
                  
                   ProcessDefinition processDefinition = ProcessFactory.build("automatic")
                   .node("1").initial().behaviour(new Display("1"))
                   .transition().to("2")
                   .node("2").behaviour(new Display("2"))
                   .transition().to("3")
                   .node("3").behaviour(new Display("3"))
                   .transition().to("4")
                   .node("4").behaviour(new Display("4"))
                   .transition().to("5")
                   .node("5").behaviour(new Display("5"))
                   .transition().to("6")
                   .node("6").behaviour(new Display("6"))
                   .transition().to("7")
                   .node("7").behaviour(new Display("7"))
                   .transition().to("8")
                   .node("8").behaviour(new Display("8"))
                   .transition().to("9")
                   .node("9").behaviour(new Display("9"))
                   .transition().to("10")
                   .node("10").behaviour(new Display("10"))
                   .transition().to("11")
                   .node("11").behaviour(new Display("11"))
                   .transition().to("12")
                   .node("12").behaviour(new Display("12"))
                   .transition().to("13")
                   .node("13").behaviour(new Display("13"))
                   .transition().to("14")
                   .node("14").behaviour(new Display("14"))
                   .transition().to("15")
                   .node("15").behaviour(new Display("15"))
                   .transition().to("16")
                   .node("16").behaviour(new Display("16"))
                   .transition().to("17")
                   .node("17").behaviour(new Display("17"))
                   .transition().to("18")
                   .node("18").behaviour(new Display("18"))
                   .transition().to("19")
                   .node("19").behaviour(new Display("19"))
                   .transition().to("20")
                   .node("20").behaviour(new Display("20"))
                   .done();
                  
                   private void performanceMultiThread(int nThreads, int nExecutions){
                  
                   for (int i=0;i<nThreads;i++){
                   new ProcessRunner(i, nExecutions);
                   }
                   try {
                   System.in.read();
                   } catch (IOException e) {
                   e.printStackTrace();
                   }
                   System.out.println("Process stats:");
                   System.out.println("Total exec time: " + globalSum);
                   System.out.println("Total executions: " + globalNum);
                   System.out.println("Min. exec. time: " + globalMin);
                   System.out.println("Max. exec. time: " + globalMax);
                   System.out.println("Avg. exec. time: " + (globalSum/globalNum));
                  
                   }
                  
                  
                  
                   public class Display implements Activity {
                   private static final long serialVersionUID = 1L;
                   String message;
                  
                   public Display(String message) {
                   this.message = message;
                   }
                  
                   public void execute(Execution execution) {
                   }
                   }
                  
                   float globalMin = 0;
                   float globalMax = 0;
                   float globalSum = 0;
                   float globalNum = 0;
                  
                   class ProcessRunner implements Runnable{
                   Thread t;
                  
                   int n;
                  
                   int execNumber;
                  
                   public ProcessRunner(int execNumber, int n) {
                   this.n = n;
                   this.execNumber = execNumber;
                   t = new Thread(this, "Hilo hijo");// Crea un nuevo hilo
                   t.start(); // Comienza el hilo
                   }
                  
                   public void run() {
                   performance(n);
                   }
                  
                   public void performance(int n){
                   float min = 0;
                   float max = 0;
                   float sum = 0;
                   float num = 0;
                   long timestamp = 0;
                   float time = 0;
                  
                   Map<String, Object> variables = new HashMap<String, Object>();
                   variables.put("mensaje", "Prueba");
                   variables.put("contador", new Integer(0));
                  
                   // Initial
                   timestamp = System.currentTimeMillis();
                   Execution execution = processDefinition.startExecution(variables);
                   if (execution.isEnded()){
                   System.out.println("Initial execution time: " + (System.currentTimeMillis() - timestamp));
                   }else{
                   System.out.println("Execution is not ended.");
                   }
                  
                   // Burst
                   for(int i=0; i<n; i++, num++){
                   timestamp = System.currentTimeMillis();
                   execution = processDefinition.startExecution(variables);
                   time = System.currentTimeMillis() - timestamp;
                  
                   globalNum += 1;
                  
                   sum += time;
                   globalSum += time;
                   // min
                   min = (time<=min)?time:min;
                   globalMin = (time<=min)?time:min;
                   // max
                   max = (time>=max)?time:max;
                   globalMax = (time>=max)?time:max;
                   }
                   if (execution.isEnded()){
                   System.out.println("[+"+execNumber+"]:Process stats:");
                   System.out.println("[+"+execNumber+"]:Total exec time: " + sum);
                   System.out.println("[+"+execNumber+"]:Total executions: " + num);
                   System.out.println("[+"+execNumber+"]:Min. exec. time: " + min);
                   System.out.println("[+"+execNumber+"]:Max. exec. time: " + max);
                   System.out.println("[+"+execNumber+"]:Avg. exec. time: " + (sum/num));
                   }else{
                   System.out.println("Execution is not ended.");
                   }
                   }
                   }
                  
                  
                   public static void main(String[] args) {
                   TestSomPVM test = new TestSomPVM();
                  // test.performanceMultiThread(1, 10000);
                  // test.performanceMultiThread(10, 1000);
                  // test.performanceMultiThread(100, 100);
                  
                  // test.performanceMultiThread(1, 100000);
                  // test.performanceMultiThread(10, 10000);
                   test.performanceMultiThread(100, 1000);
                   }
                  }
                  


                  The action Handlers for the jBpm version:
                  package test.jpdl.test;
                  
                  import org.jbpm.graph.def.ActionHandler;
                  import org.jbpm.graph.exe.ExecutionContext;
                  
                  public class MyActionHandler implements ActionHandler {
                  
                   private String mensaje = null;
                  
                   public void setMensaje(String mensaje) {
                   this.mensaje = mensaje;
                   }
                  
                   public void execute(ExecutionContext executionContext) throws Exception {
                   executionContext.leaveNode();
                   }
                  
                  }
                  


                  And needed imports.

                  You should see that this is a fairly easy example, as those posted in Tom`s blog, where it stated "awesome performance"