1 2 3 Previous Next 43 Replies Latest reply on Sep 16, 2009 9:04 AM by mmusaji

    workflow design about wait states

    mmusaji

      JBPM 4.0 - My workflow contains a fork with multiple custom nodes that all change different objects. My problem is in real life, I won't know how long this process takes.

      I'm experimenting (without much luck) having a wait state before the join which would use an event-listener to ensure all the objects have been updated. My Unit test just keep going through and process completing and im struggling testing this. Is this the right way of going about this problem?

        • 1. Re: workflow design about wait states
          kukeltje

           

          Is this the right way of going about this problem?


          No, the best way is to show us code, preferably a full unit test (see the first post in this forum). including simulation on how the event-listener is triggered etc... (why an event-listener and not just signal the specific state)?



          • 2. Re: workflow design about wait states
            mmusaji

            Hi

            Okay so here's my workflow.

            <?xml version="1.0" encoding="UTF-8"?>
            
            <process name="process" xmlns="http://jbpm.org/4.0/jpdl">
             <start>
             <transition to="parse request"/>
             </start>
            
             <custom class="org.application.workflow.ParseRequest" name="parse request" two="#{myObj}">
             <transition to="evaluate parse result"/>
             </custom>
            
             <decision name="evaluate parse result" expr="#{content}" >
             <transition name="valid" to="find providers" />
             <transition name="invalid" to="error"/>
             </decision>
            
             <custom two="#{myObj}" class="org.application.workflow.FindProviders" name="find providers">
             <transition to="fork"/>
             </custom>
            
             <fork name="fork">
             <transition name="validate one" to="validate one request"/>
             <transition name="validate two" to="validate two request"/>
             <transition name="validate three" to="validate three request"/>
             </fork>
            
             <!-- EACH OF THESE CUSTOM NODES WILL TAKE AN UNKNOWN AMOUNT OF TIME -->
             <custom continue="async" name="validate one request" class="org.application.workflow.ValidateoneFraudRequest" two="#{oneDetails}">
             <transition to="join"/>
             </custom>
            
             <custom continue="async" name="validate two request" class="org.application.workflow.ValidateExpFraudRequest" two="#{twoDetails}">
             <transition to="join"/>
             </custom>
            
             <custom continue="async" name="validate three request" class="org.application.workflow.ValidateSysFraudRequest" two="#{threeDetails}">
             <transition to="join"/>
             </custom>
            
             <!-- I WANT TO WAIT HERE UNTIL THEY ARE COMPLETE -->
             <state name="wait">
             <on event="fork">
             <event-listener class="org.application.workflow.WaitForValidation"/>
             </on>
             <transition to="join"/>
             </state>
            
             <join name="join" continue="exclusive">
             <transition name="to evaluate validation results" to="evaluate validation results"/>
             </join>
            
             <decision name="evaluate validation results" expr="#{content}" >
             <transition name="valid" to="construct message" />
             <transition name="invalid" to="error"/>
             </decision>
            
             <custom name="construct message" class="org.application.workflow.ConstructRequest" two="#{myObj}">
             <transition to="send"/>
             </custom>
            
             <custom name="send" class="org.application.workflow.SendRequest" two="#{myObj}">
             <transition to="get responses"/>
             </custom>
            
             <custom name="get responses" class="org.application.workflow.GetResponse" two="#{myObj}">
             <transition to="process responses"/>
             </custom>
            
             <custom name="process responses" class="org.application.workflow.ProcessResponse" two="#{myObj}">
             <transition to="getMyObject"/>
             </custom>
            
             <custom name="getMyObject" class="org.application.workflow.GetMyObject" two="#{myObj}">
             <transition to="end"/>
             </custom>
            
             <task name="end">
             <transition name="to complete" to="end process"/>
             </task>
            
             <end name="end process" state="complete"/>
             <end-error name="error"/>
            
             </process>
            


            This is my WebMethod. This is what starts the process when a request is recieved and will give a better idea of what I'm trying to achieve. I hope.

            @WebMethod
             public String request() {
             try {
             InitialContext ctx = new InitialContext();
             this.processEngine = (ProcessEngine)ctx.lookup("java:/ProcessEngine");
            
             ExecutionService execService = (ExecutionService)
             this.processEngine.get(ExecutionService.class);
            
             ProcessInstance processInstance = execService.startProcessInstanceByKey("process", variables);
            
             System.out.println("ProcessInstanceID: " + processInstance.getId());
             //we need a signal cos we have no idea how long the return from the providers will take in real life
             Thread.sleep(3000);
            
            //if I check this too early then I don't get the right details back
             oneDetails = (OneDetails)execService.getVariable(processInstance.getId(), "oneDetails");
             result = "OneDetails: " + oneDetails.getDetails();
            
             } catch (Exception e) {
             e.printStackTrace();
             }
            


            Please explain further about signalling the specific state... which state? The one that is waiting for the processes in the fork to complete?

            My Unit test which is attempting to signal the wait state is below. This isn't working at all at the moment.
            public void testSimple() throws Exception {
            
             ProcessInstance processInstance = executionService.startProcessInstanceByKey("process", variables);
             String processInstanceId = processInstance.getId();
            
             Execution waitExecution = processInstance.findActiveExecutionIn("wait");
             executionService.signalExecutionById(waitExecution.getId());
            
             OneDetails oneDetails = (OneDetails )executionService.getVariable(processInstanceId, "oneDetails ");
             }
            


            • 3. Re: workflow design about wait states
              mmusaji

              Your comment about signalling the state made me look at the "StateChoiceTest" in the examples. Is this what you meant? Because I think this might work.

              • 4. Re: workflow design about wait states
                kukeltje

                Yep, and you can e.g. pass e.g. the specific id then to your remote service and use it to signal this specific id when finished, or store it in your domainmodel as a reference, etc...

                • 5. Re: workflow design about wait states
                  mmusaji

                  Thanks again Ronald. I think I know what you mean. I'll investigate further.

                  • 6. Re: workflow design about wait states
                    mmusaji

                    I understand what you said earlier Ronald, having trouble implementing this in a Junit test.

                    I've added a state in my workflow which will wait until signal recieved. Its after the join in the above example. I wont paste the whole workflow again, hope that's ok.

                    My Unit test is

                    public void testSimple() throws Exception {
                    
                     ProcessInstance processInstance = executionService.startProcessInstanceByKey("process", variables);
                     String processInstanceId = processInstance.getId();
                    
                     String executionId = processInstance.findActiveExecutionIn("wait").getId();
                    
                     processInstance = executionService.signalExecutionById(executionId, "continue");
                    }


                    Problem is I always get null pointer exception as no execution is found in "wait". I presume what is happening is similar to a problem I had before when the async processes from fork carry on, my unit test in a seperate thread carries on and the wait state isn't there yet.

                    Any suggestions?


                    • 7. Re: workflow design about wait states
                      kukeltje

                      then there is no execution in wait... where in your definition do you go to wait? I do not see that...

                      • 8. Re: workflow design about wait states
                        mmusaji

                        Previous one did not have wait state. I was referring to it once I'd added but here it is.

                        <?xml version="1.0" encoding="UTF-8"?>
                        
                        <process name="process" xmlns="http://jbpm.org/4.0/jpdl">
                         <start>
                         <transition to="parse request"/>
                         </start>
                        
                         <custom class="org.application.workflow.ParseRequest" name="parse request" two="#{myObj}">
                         <transition to="evaluate parse result"/>
                         </custom>
                        
                         <decision name="evaluate parse result" expr="#{content}" >
                         <transition name="valid" to="find providers" />
                         <transition name="invalid" to="error"/>
                         </decision>
                        
                         <custom two="#{myObj}" class="org.application.workflow.FindProviders" name="find providers">
                         <transition to="fork"/>
                         </custom>
                        
                         <fork name="fork">
                         <transition name="validate one" to="validate one request"/>
                         <transition name="validate two" to="validate two request"/>
                         <transition name="validate three" to="validate three request"/>
                         </fork>
                        
                         <!-- EACH OF THESE CUSTOM NODES WILL TAKE AN UNKNOWN AMOUNT OF TIME -->
                         <custom continue="async" name="validate one request" class="org.application.workflow.ValidateoneFraudRequest" two="#{oneDetails}">
                         <transition to="join"/>
                         </custom>
                        
                         <custom continue="async" name="validate two request" class="org.application.workflow.ValidateExpFraudRequest" two="#{twoDetails}">
                         <transition to="join"/>
                         </custom>
                        
                         <custom continue="async" name="validate three request" class="org.application.workflow.ValidateSysFraudRequest" two="#{threeDetails}">
                         <transition to="join"/>
                         </custom>
                        
                         <join name="join" continue="exclusive">
                         <transition name="wait" to="wait"/>
                         </join>
                        
                         <state name="wait">
                         <transition to="evaluate validation results"/>
                         </state>
                        
                         <decision name="evaluate validation results" expr="#{content}" >
                         <transition name="valid" to="construct message" />
                         <transition name="invalid" to="error"/>
                         </decision>
                        
                         <custom name="construct message" class="org.application.workflow.ConstructRequest" two="#{myObj}">
                         <transition to="send"/>
                         </custom>
                        
                         <custom name="send" class="org.application.workflow.SendRequest" two="#{myObj}">
                         <transition to="get responses"/>
                         </custom>
                        
                         <custom name="get responses" class="org.application.workflow.GetResponse" two="#{myObj}">
                         <transition to="process responses"/>
                         </custom>
                        
                         <custom name="process responses" class="org.application.workflow.ProcessResponse" two="#{myObj}">
                         <transition to="getMyObject"/>
                         </custom>
                        
                         <custom name="getMyObject" class="org.application.workflow.GetMyObject" two="#{myObj}">
                         <transition to="end"/>
                         </custom>
                        
                         <task name="end">
                         <transition name="to complete" to="end process"/>
                         </task>
                        
                         <end name="end process" state="complete"/>
                         <end-error name="error"/>
                        
                         </process>
                        


                        • 9. Re: workflow design about wait states
                          kukeltje

                          in what nodes do you have executions? Please make e.g. a full unit test where you demonstrate that the custom nodes are entered, left etc... And what do your classes on the custom nodes do? If they are almost identical, post one of them. I get the impression you do things wrong in there.

                          • 10. Re: workflow design about wait states
                            kukeltje

                            and if possible, make a unittest as described in the sticky post in this forum.... all in one, and minimize it as far as possible (e.g. to legs in a fork, and an end direct after the wait)

                            • 11. Re: workflow design about wait states
                              mmusaji

                              My classes in the custom nodes do very little. I don't understand how these would have an impact on wait state which is after they execute.

                              public class ValidateOneRequest implements ActivityBehaviour {
                              
                               private static final long serialVersionUID = 1L;
                               ExperianDetails experianDetails;
                              
                               public void execute(ActivityExecution execution) throws Exception {
                               Thread.sleep(2000);//simulate a delay in processing.
                               OneDetails oneDetails= (OneDetails)execution.getVariable("oneDetails");
                               oneDetails = update(oneDetails); //update the object in some way (change a name)
                               execution.setVariable("oneDetails", oneDetails);
                               }
                              }
                              


                              I'm working on a unit test, appreciate your feedback so far. I tried to do this before but because of the fork the unit test would always fail as the unit test thread would continue and fork would not be complete.

                              • 12. Re: workflow design about wait states
                                mmusaji

                                Why doesn't this forum have an edit option!!!?

                                public class ValidateOneRequest implements ActivityBehaviour {
                                
                                 private static final long serialVersionUID = 1L;
                                 OneDetails oneDetails;
                                
                                 public void execute(ActivityExecution execution) throws Exception {
                                 Thread.sleep(2000);//simulate a delay in processing.
                                 OneDetails oneDetails= (OneDetails)execution.getVariable("oneDetails");
                                 oneDetails = update(oneDetails); //update the object in some way (change a name)
                                 execution.setVariable("oneDetails", oneDetails);
                                 }
                                }
                                


                                • 13. Re: workflow design about wait states
                                  kukeltje

                                   

                                  "mmusaji" wrote:
                                  I'm working on a unit test, appreciate your feedback so far. I tried to do this before but because of the fork the unit test would always fail as the unit test thread would continue and fork would not be complete.


                                  So? Your unittest fails because the behaviour of your app is not what you expect... Great, that is what unittests are for :-)

                                  You use async but do you have a job executor in your unit test? If not, the jobs are never executed and your custom nodes are never executed

                                  If you look at the examples in the source you can find these things out. As always, ttry to minimize the testcases. e.g. if you remove async on the custom nodes it might work. So you are more closer to the cause. That is debugging/troubleshooting etc.... ;-)

                                  • 14. Re: workflow design about wait states
                                    mmusaji

                                     

                                    "kukeltje" wrote:


                                    You use async but do you have a job executor in your unit test? If not, the jobs are never executed and your custom nodes are never executed



                                    They are executed. I can see that in the output as well as in the objects when I retrieve them. Using the job executor before didnt work correctly as the jobs were being executed sync not async.

                                    Here's a bit of the output to show the job executing
                                    12:33:44,091 FIN | [BaseJbpmTestCase] === starting testSimple =============================
                                    Parsing process Request
                                    12:33:45,419 FIN | [ExecuteActivity] executing activity(evaluate parse result)
                                    process ID: PROCESS_ID 001
                                    12:33:45,451 FIN | [ExecuteActivity] executing activity(find providers)
                                    Finding Providers
                                    12:33:45,451 FIN | [ExecuteActivity] executing activity(fork)
                                    12:33:45,466 FIN | [DefaultIdGenerator] generated execution id process.112.validate one
                                    12:33:45,466 FIN | [ExecutionImpl] created execution[process.112.validate one]
                                    12:33:45,466 FIN | [JobExecutorMessageSession] sending message ExecuteActivityMessage
                                    12:33:45,466 INF | [JobExecutorThread] starting...
                                    12:33:45,466 INF | [JobExecutorThread] starting...
                                    12:33:45,466 INF | [DispatcherThread] starting...
                                    12:33:45,466 INF | [JobExecutorThread] starting...
                                    12:33:45,466 FIN | [DefaultIdGenerator] generated execution id process.112.validate two
                                    12:33:45,466 FIN | [ExecutionImpl] created execution[process.112.validate two]
                                    12:33:45,466 FIN | [JobExecutorMessageSession] sending message ExecuteActivityMessage
                                    12:33:45,482 FIN | [DefaultIdGenerator] generated execution id process.112.validate three
                                    12:33:45,482 FIN | [ExecutionImpl] created execution[process.112.validate three]
                                    12:33:45,482 FIN | [JobExecutorMessageSession] sending message ExecuteActivityMessage
                                    12:33:45,482 FIN | [AcquireJobsCmd] start querying first acquirable job...
                                    12:33:45,482 FIN | [AcquireJobsCmd] locking jobs []
                                    12:33:45,482 FIN | [GetNextDueDateCmd] getting next due date...
                                    12:33:45,482 FIN | [GetNextDueDateCmd] next due date is null
                                    12:33:45,482 FIN | [DispatcherThread] DispatcherThread will wait for max 600ms on org.jbpm.pvm.internal.jobexecutor.JobExecutor@171194d
                                    12:33:45,482 FIN | [DispatcherThread] DispatcherThread woke up
                                    12:33:45,482 FIN | [AcquireJobsCmd] start querying first acquirable job...
                                    12:33:45,482 FIN | [AcquireJobsCmd] locking jobs [109]
                                    12:33:45,482 FIN | [DispatcherThread] pushing jobs on the queue [109]
                                    12:33:45,482 FIN | [DispatcherThread] added jobs [109] to the queue
                                    12:33:45,482 FIN | [JobExecutorThread] took job(s) [109] from queue
                                    12:33:45,482 FIN | [AcquireJobsCmd] start querying first acquirable job...
                                    12:33:45,498 FIN | [ExecuteJobCmd] executing job ExecuteActivityMessage[109]...
                                    12:33:45,498 FIN | [AcquireJobsCmd] locking jobs [110]
                                    12:33:45,498 FIN | [DispatcherThread] pushing jobs on the queue [110]
                                    12:33:45,498 FIN | [DispatcherThread] added jobs [110] to the queue
                                    12:33:45,498 FIN | [AcquireJobsCmd] start querying first acquirable job...
                                    12:33:45,498 FIN | [AcquireJobsCmd] locking jobs [111]
                                    12:33:45,498 FIN | [JobExecutorThread] took job(s) [110] from queue
                                    12:33:45,498 FIN | [DispatcherThread] pushing jobs on the queue [111]
                                    12:33:45,498 FIN | [DispatcherThread] added jobs [111] to the queue
                                    12:33:45,498 FIN | [AcquireJobsCmd] start querying first acquirable job...
                                    12:33:45,498 FIN | [JobExecutorThread] took job(s) [111] from queue
                                    12:33:45,498 FIN | [AcquireJobsCmd] locking jobs []
                                    12:33:45,513 FIN | [ExecuteJobCmd] executing job ExecuteActivityMessage[111]...
                                    12:33:45,513 FIN | [ExecuteJobCmd] executing job ExecuteActivityMessage[110]...
                                    12:33:45,513 FIN | [ExecuteActivity] execution[process.112.validate one] executes activity(validate one request)
                                    12:33:45,560 FIN | [GetNextDueDateCmd] getting next due date...
                                    12:33:45,576 FIN | [GetNextDueDateCmd] next due date is null
                                    12:33:45,576 FIN | [DispatcherThread] DispatcherThread will wait for max 600ms on org.jbpm.pvm.internal.jobexecutor.JobExecutor@171194d
                                    12:33:45,576 FIN | [ExecuteActivity] execution[process.112.validate two] executes activity(validate two request)
                                    12:33:45,576 FIN | [ExecuteActivity] execution[process.112.validate three] executes activity(validate three request)
                                    12:33:46,169 FIN | [DispatcherThread] DispatcherThread woke up
                                    12:33:46,169 FIN | [AcquireJobsCmd] start querying first acquirable job...
                                    12:33:46,169 FIN | [AcquireJobsCmd] locking jobs []
                                    12:33:46,169 FIN | [GetNextDueDateCmd] getting next due date...
                                    12:33:46,169 FIN | [GetNextDueDateCmd] next due date is null
                                    12:33:46,169 FIN | [DispatcherThread] DispatcherThread will wait for max 600ms on org.jbpm.pvm.internal.jobexecutor.JobExecutor@171194d
                                    


                                    I have output which is from the classes so i know for sure they are being executed.

                                    I can't remove "async"... they need to run async. This is the problem I think of why the wait state cannot be found.

                                    1 2 3 Previous Next