1 2 3 Previous Next 60 Replies Latest reply on Dec 11, 2005 9:35 AM by chprvkmr

    Simultaneous execution of  Fork children

    aparna.krishna

      How can i achieve simultaneous exxecution of children of a Fork node. I have tried creating custom nodes, but the execution is always serial(in this example node 1 is executed, then node2).

      I have also tried to create threads that run (node.leave) for each tranisiton simultaneously but that is raising different set of problems. In this case i am seeing an extra token being run for '\'.

      I am unable to resolve this based on the examples and documentation. Any help is appreciated a lot.
      Thanks.

      For example

      Start-> Fork->one, two-> join->end



      <process-definition name='foreachfork-example'>
      <start-state name='start'>

      </start-state>



















      </process-definition >


        • 1. Re: Simultaneous execution of  Fork children
          aparna.krishna

          The XML is not displayed correctly.

          [CODE]
          <process-definition name='foreachfork-example'>
          <start-state name='start'>

          </start-state>



















          </process-definition >
          [/CODE]

          • 2. Re: Simultaneous execution of  Fork children
            kukeltje

            use lowercase code

            • 3. Re: Simultaneous execution of  Fork children
              aparna.krishna

              Code is displayed correctly now. The task-nodes one and two have to run simultaneously, both the actions perform some long running tasks. How can i achieve this? Have posted before too on this forum, but there is no reply.
              I am assuming simultaneous execution is something a number of users need. Currently the execution path for fork is it completes traversal of one path (in this example if there are more nodes under 1) and then proceeds with traversal of node 2.


              Tried creating custom nodes as well, but did not help. Feels i have exhausted all options.

              Appreciate any help.

              "<process-definition name='foreachfork-example'>" +
               " <start-state name='start'>" +
               " <transition to='foreachfork' />" +
               " </start-state>" +
              
               " <fork name='foreachfork'>" +
               " <transition name='one' to='one'/>" +
               " <transition name='two' to='two'/>" +
               " </node>" +
               " <task-node name='one'>" +
               " <transition to='joinforeach'>" +
               " <action class='org.jbpm.tutorial.action.DoActionHandler1' config-type='bean'>" +
               " </action>" +
               " </transition>" +
               " </task-node>" +
               " <task-node name='two'>" +
               " <transition to='joinforeach'>" +
               " <action class='org.jbpm.tutorial.action.DoActionHandler2' config-type='bean'>" +
               " </action>" +
               " </transition>" +
               " </task-node>" +
               " <join name='joinforeach'>" +
               " <transition to='end'/>" +
               " </join>" +
               " <end-state name='end'/>" +
               "</process-definition>"
              [\code]


              • 4. Re: Simultaneous execution of  Fork children
                hacim_bengali

                Just an idea. This might not fit to your situation.

                I have a slightly different example which uses threads in which the Token objects which exists after the fork are signalled. It looks like this:

                StringBuffer def = new StringBuffer();
                
                def
                
                 .append("<process-definition>")
                
                 .append(" <start-state>")
                
                 .append(" <transition to='fork' />")
                
                 .append(" </start-state>")
                
                 .append(" <fork name='fork'>")
                
                 .append(" <transition name='f1' to='node1'/>")
                
                 .append(" <transition name='f2' to='node2'/>")
                
                 .append(" <transition name='f3' to='node3'/>")
                
                 .append(" </fork>")
                
                 .append(" <state name='node1'>")
                
                 .append(" <event type='node-leave'>")
                
                 .append(
                
                 " <action class='org.jbpm.tutorial.action.MyActionHandler'/>")
                
                 .append(" </event>")
                
                 .append(" <transition to='join'/>")
                
                 .append(" </state>")
                
                 .append(" <state name='node2'>")
                
                 .append(" <event type='node-leave'>")
                
                 .append(
                
                 " <action class='org.jbpm.tutorial.action.MyActionHandler'/>")
                
                 .append(" </event>")
                
                 .append(" <transition to='join'/>")
                
                 .append(" </state>")
                
                 .append(" <state name='node3'>")
                
                 .append(" <event type='node-leave'>")
                
                 .append(
                
                 " <action class='org.jbpm.tutorial.action.MyActionHandler'/>")
                
                 .append(" </event>")
                
                 .append(" <transition to='join'/>")
                
                 .append(" </state>")
                
                 .append(" <join name='join'>")
                
                 .append(" <transition to='end'/>")
                
                 .append(" </join>")
                
                 .append(" <end-state name='end'/>")
                
                 .append("</process-definition>");
                
                
                
                ProcessDefinition processDefinition = ProcessDefinition
                
                 .parseXmlString(def.toString());
                
                
                
                ProcessInstance processInstance =
                
                 new ProcessInstance(processDefinition);
                
                
                
                processInstance.signal();
                
                
                
                // we expect several new tokens at this point
                
                Token rootToken = processInstance.getRootToken();
                
                System.out.println(rootToken.getActiveChildren());
                
                
                
                
                
                List threadList = new LinkedList();
                
                while (rootToken.getActiveChildren().size()>0) {
                
                 Iterator i = rootToken.getActiveChildren().values().iterator();
                
                 while (i.hasNext()) {
                
                 Token token = (Token) i.next();
                
                 Thread signalThread = new SignalThread(token);
                
                 threadList.add(signalThread);
                
                 signalThread.start();
                
                 }
                
                 while (threadList.size()>0) {
                
                 ((Thread) threadList.get(0)).join();
                
                 threadList.remove(0);
                
                 }
                
                }
                
                
                
                assertTrue(MyActionHandler.isExecuted);
                
                assertEquals(3,MyActionHandler.counter);
                


                with

                class SignalThread extends Thread {
                
                 private Token token;
                
                 public SignalThread(Token token) {
                
                 this.token = token;
                
                 }
                
                
                
                 public void run()
                
                 {
                
                 token.signal();
                
                 System.out.println("SignalThread "+this.getName()+" finished");
                
                 }
                
                }
                


                The Thread handling is not right yet but the example is working.

                • 5. Re: Simultaneous execution of  Fork children
                  jim.mcmaster

                  I have been looking at the source trying to figure out how to do this myself. I am not at all sure this will work, because of thread-safety issues related to Thread.children. Access is not synchronized, so the code in Join.java which reactivates the parent token is likely to have problems.

                  • 6. Re: Simultaneous execution of  Fork children
                    kukeltje

                    use a fork which has two or more nodes. Have these nodes do async actions. These will be carried-out almost simutaneously, since they return quite quick to the node. The long running actions will run in parallel. Async actions is in 3.1 or do it yourself in 3.0

                    Will this solve your issue enough?

                    • 7. Re: Simultaneous execution of  Fork children
                      aparna.krishna

                      How can we get JBPM 3.1?

                      I have tried a work around by using an altered form of solution that hacim_bengali proposed. In my example i have multiple joins and forks. A node is completed if all the child nodes are completed . It works the way i need it (there is no issue with join/fork), you can give it a try.

                       StringBuffer def = new StringBuffer();
                       def
                       .append("<process-definition>")
                       .append(" <start-state>")
                       .append(" <transition to='one' />")
                       .append(" </start-state>")
                      
                       .append(" <state name='one'>")
                       .append(" <transition to='two'/>")
                       .append(" </state>")
                      
                       .append(" <state name='two'>")
                       .append(" <transition to='fork'/>")
                       .append(" </state>")
                      
                       .append(" <fork name='fork'>")
                       .append(" <transition name='f1' to='node1'/>")
                       .append(" <transition name='f3' to='node3'/>")
                       .append(" <transition name='f2' to='node2'/>")
                       .append(" </fork>")
                      
                       .append(" <state name='node1'>")
                       .append(" <event type='node-leave'>")
                       .append(
                      
                       " <action class='org.jbpm.tutorial.action.SleepActionHandler' config-type='field'>" +
                       " <interval>30</interval>" +
                       " </action>" )
                       .append(" </event>")
                       .append(" <transition to='join'/>")
                       .append(" </state>")
                      
                       .append(" <state name='node3'>")
                       .append(" <event type='node-leave'>")
                       .append(
                      
                       " <action class='org.jbpm.tutorial.action.SleepActionHandler' config-type='field'>" +
                       " <interval>20</interval>" +
                       " </action>" )
                       .append(" </event>")
                       .append(" <transition to='node4'/>")
                       .append(" </state>")
                      
                       .append(" <state name='node2'>")
                       .append(" <event type='node-leave'>")
                       .append(
                      
                       " <action class='org.jbpm.tutorial.action.SleepActionHandler' config-type='field'>" +
                       " <interval>10</interval>" +
                       " </action>" )
                       .append(" </event>")
                       .append(" <transition to='join'/>")
                       .append(" </state>")
                      
                       .append(" <fork name='node4'>")
                       .append(" <transition name='f5' to='node5'/>")
                       .append(" <transition name='f6' to='node6'/>")
                       .append(" </fork>")
                      
                       .append(" <state name='node5'>")
                       .append(" <event type='node-leave'>")
                       .append(
                      
                       " <action class='org.jbpm.tutorial.action.SleepActionHandler' config-type='field'>" +
                       " <interval>10</interval>" +
                       " </action>" )
                       .append(" </event>")
                       .append(" <transition to='jointwo'/>")
                       .append(" </state>")
                      
                       .append(" <state name='node6'>")
                       .append(" <event type='node-leave'>")
                       .append(
                      
                       " <action class='org.jbpm.tutorial.action.SleepActionHandler' config-type='field'>" +
                       " <interval>20</interval>" +
                       " </action>" )
                       .append(" </event>")
                       .append(" <transition to='node7'/>")
                       .append(" </state>")
                      
                       .append(" <fork name='node7'>")
                       .append(" <transition name='f8' to='node8'/>")
                       .append(" <transition name='f9' to='node9'/>")
                       .append(" </fork>")
                      
                       .append(" <state name='node8'>")
                      
                       .append(" <event type='node-leave'>")
                      
                       .append(
                      
                       " <action class='org.jbpm.tutorial.action.SleepActionHandler' config-type='field'>" +
                       " <interval>20</interval>" +
                       " </action>" )
                      
                       .append(" </event>")
                      
                       .append(" <transition to='jointwo'/>")
                      
                       .append(" </state>")
                      
                       .append(" <state name='node9'>")
                      
                       .append(" <event type='node-leave'>")
                      
                       .append(
                      
                       " <action class='org.jbpm.tutorial.action.SleepActionHandler' config-type='field'>" +
                       " <interval>20</interval>" +
                       " </action>" )
                       .append(" </event>")
                      
                       .append(" <transition to='jointwo'/>")
                      
                       .append(" </state>")
                       .append(" <join name='jointwo'>")
                      
                       .append(" <transition to='join'/>")
                      
                       .append(" </join>")
                       .append(" <join name='join'>")
                       .append(" <transition to='end'/>")
                       .append(" </join>")
                       .append(" <end-state name='end'/>")
                       .append("</process-definition>");
                      
                      
                       ProcessDefinition processDefinition = ProcessDefinition
                       .parseXmlString(def.toString());
                      
                       ProcessInstance processInstance =
                       new ProcessInstance(processDefinition);
                      
                      // we expect several new tokens at this point
                       Token rootToken = processInstance.getRootToken();
                      
                       while (rootToken.getActiveChildren().size()== 0) {
                       processInstance.signal();
                       }
                      
                      
                       List threadList = new LinkedList();
                       while (rootToken.getActiveChildren().size()>0) {
                      
                       Iterator i = rootToken.getActiveChildren().values().iterator();
                      
                       while (i.hasNext()) {
                      
                       Token token = (Token) i.next();
                      
                       Thread signalThread = new SignalThread(token,threadList);
                      
                       threadList.add(signalThread);
                      
                       signalThread.start();
                      
                       }
                      
                      
                       while (threadList.size()>0) {
                       try {
                      
                       ((Thread) threadList.get(0)).join();
                      
                       threadList.remove(0);
                      
                       } catch (Exception e){
                       System.out.println("Exception in thread join" + e.toString());
                       }
                       }
                      
                       }
                       }
                      
                      




                      And the SignalThread code is something like this

                      class SignalThread extends Thread {
                       private Token token;
                       private List threadList;
                      
                       public SignalThread(Token token, List threadList) {
                       this.token = token;
                       this.threadList = threadList;
                       }
                      
                       public void run()
                       {
                       token.signal();
                      
                       Iterator j = token.getActiveChildren().values().iterator();
                      
                       while(j.hasNext()) {
                       Token ctoken = (Token) j.next();
                      
                       Thread signalThread = new SignalThread(ctoken, threadList);
                      
                       threadList.add(signalThread);
                       signalThread.start();
                      
                       }
                      
                       System.out.println("SignalThread "+this.getName()+" finished");
                       }
                      }
                      


                      • 8. Re: Simultaneous execution of  Fork children
                        kukeltje

                        the 3.1 branche is in cvs.

                        It might work what you do, but no guarantees from our side. There still might be some issues regarding the join and threadsfety

                        • 9. Re: Simultaneous execution of  Fork children
                          koen.aers

                          There is already an alpha release available at http://sourceforge.net/project/showfiles.php?group_id=70542&package_id=145174&release_id=367608. CVS head might be a bit messy around this time.

                          Regards,
                          Koen

                          • 10. Re: Simultaneous execution of  Fork children
                            aparna.krishna

                            How do we use the async in the node/action in 3.1 (i have 3.1 alpha). When i tried using it, my action-handler never gets called. The workflow just progresses without any work getting done. My goal is to get two/more nodes to execute simultaneously and execute some parallel actions. The state of the workflow will depend on the result of the action(if an action fails then follow a different path than if it suceeds). How can i achieve this in the current infrastructure?

                            I have tried using the async attribute in node like this.

                             .append(" <node name='node1' async='true'>" )
                             .append(
                            
                             " <action class='org.jbpm.tutorial.action.SleepActionHandler' config-type='field' >" +
                             " <interval>15</interval>" +
                             " </action>" )
                            
                             .append(" <transition to='join'/>")
                             .append(" </node>")
                            
                            


                            I have also tried defining the action in the various events (node-enter/node-leave) as well.

                            I have also tried defining the tag in the action as well
                             .append(" <node name='node1' )
                             .append(
                            
                             " <action class='org.jbpm.tutorial.action.SleepActionHandler' config-type='field' async='true' >" +
                             " <interval>15</interval>" +
                             " </action>" )
                            
                             .append(" <transition to='join'/>")
                             .append(" </node>")
                            
                            


                            The sample XML is like this, i have a more complicated workflow with multiple joins and nodes..

                             StringBuffer def = new StringBuffer();
                             def
                             .append("<process-definition>")
                             .append(" <start-state>")
                             .append(" <transition to='fork' />")
                             .append(" </start-state>")
                            
                            
                             .append(" <fork name='fork'>")
                             .append(" <transition name='f1' to='node1'/>")
                             .append(" <transition name='f2' to='node2'/>")
                             .append(" </fork>")
                            
                             .append(" <node name='node1' >" )
                             .append(
                            
                             " <action class='org.jbpm.tutorial.action.SleepActionHandler' config-type='field' async='true'>" +
                             " <interval>15</interval>" +
                             " </action>" )
                            
                             .append(" <transition to='join'/>")
                             .append(" </node>")
                            
                             .append(" <node name='node2' >" )
                            
                             .append(
                            
                             " <action class='org.jbpm.tutorial.action.SleepActionHandler' config-type='field' async='true'>" +
                             " <interval>10</interval>" +
                             " </action>" )
                            
                             .append(" <transition to='join'/>")
                             .append(" </node>")
                            
                             .append(" <join name='join'>" )
                             .append(" <transition to='end'/>")
                             .append(" </join>")
                            
                            
                             .append(" <end-state name='end'/>")
                             .append("</process-definition>");
                            
                            



                            The calling code is something simple like
                             ProcessDefinition processDefinition = ProcessDefinition
                             .parseXmlString(def.toString());
                            
                            
                             ProcessInstance processInstance =
                             new ProcessInstance(processDefinition);
                            
                             while (! processInstance.hasEnded() ) {
                             processInstance.signal();
                             }
                            


                            Thanks in advance,
                            Aparna

                            • 11. Re: Simultaneous execution of  Fork children
                              koen.aers

                              Have a look at AsyncExecutionDbTest in the jbpm testsuite. You'll find it in the org.jbpm.msg.command package. As you can see there, you'll have to make sure to have a CommandExecutor running to consume the messages and execute the actions.

                              Regards,
                              Koen

                              • 12. Re: Simultaneous execution of  Fork children
                                averas

                                If I have understood things correctly a forknode doesn't really fork exeuction and execute the outgoing paths in parallell,
                                but actually executes the paths in a serial manner. Hence; if child path X takes 5 minutes to hit the join node and child path Y takes
                                10 minutes, the join node will continue after an accumulated execution time of 15 minutes, right?

                                Is there really any difference in this behaviour instead of having all the nodes after each other in the definition?
                                Except for the non-shared Context Variables when using multiple tokens.

                                Am I correct if jBPM 3.1 will make use of complete parallell execution of child-tokens when using Fork-nodes?

                                -ra

                                • 13. Re: Simultaneous execution of  Fork children
                                  koen.aers

                                   

                                  If I have understood things correctly a forknode doesn't really fork exeuction and execute the outgoing paths in parallell

                                  You understood correctly. The purpose of a fork in jBPM is not to spawn concurrent threads of execution on a processor but rather to create multiple concurrent paths of execution in the overall system, which is not at all the same thing. The implementation of the fork node does this by creating child tokens of the parent token that enters the fork node.

                                  Is there really any difference in this behaviour instead of having all the nodes after each other in the definition?

                                  So the answer here is yes. A sequence does not offer the possibility of multiple parallel paths of execution in the overall system.

                                  Am I correct if jBPM 3.1 will make use of complete parallell execution of child-tokens when using Fork-nodes?

                                  So you are not quite correct here. The progress of different child tokens in a system is already completely independent from one another. Except of course if there are no wait states involved, then the progress is completely deterministic and in sequence.
                                  The bottomline of this is that you have to introduce wait states or nodes with the 'async' attribute set to true if you want the execution of the sibling tokens to be undeterministic.

                                  Regards,
                                  Koen

                                  • 14. Re: Simultaneous execution of  Fork children
                                    chprvkmr

                                    Hi,

                                    General impression that everyone get on fork is multiple elements of it would run parallely. But lack of such support is a big drawback.. In jbpm 3.0.2 whether using fork-join combination is equivalent to have multiple states put together one after another, executing serially one after another!. How to accomplish this parallel execution pattern in jbpm 3.0.2

                                    1 2 3 Previous Next