1 2 3 Previous Next 43 Replies Latest reply on Sep 16, 2009 9:04 AM by mmusaji Go to original post
      • 30. Re: workflow design about wait states
        kukeltje

        If you changed the last thing, can you repost your unittest (assuming the processdefinition did not change) then I'll give it a try here.

        • 31. Re: workflow design about wait states
          mmusaji

          For completion purposes I'll post my unittest and processdefinition again:

          My unit test with changes suggested.

          package org.workflow.test.forum;
          
          import java.util.List;
          import java.util.Map;
          
          import org.jbpm.api.Execution;
          import org.jbpm.api.ProcessInstance;
          import org.jbpm.api.activity.ActivityBehaviour;
          import org.jbpm.api.activity.ActivityExecution;
          import org.jbpm.api.activity.ExternalActivityBehaviour;
          import org.jbpm.api.job.Job;
          import org.jbpm.test.JbpmTestCase;
          
          public class ProcessTest extends JbpmTestCase {
           String deploymentDbid;
          
           protected void setUp() throws Exception {
           super.setUp();
           deploymentDbid = repositoryService.createDeployment()
           .addResourceFromClasspath("org/workflow/test/forum/process.jpdl.xml")
           .deploy();
           }
          
           protected void tearDown() throws Exception {
           repositoryService.deleteDeploymentCascade(deploymentDbid);
           super.tearDown();
           }
          
           public void testProcess() {
           ProcessInstance processInstance = executionService.startProcessInstanceByKey("process");
           Execution executionInOne = processInstance.findActiveExecutionIn("custom one");
           assertNotNull(executionInOne);
           processInstance = executionService.signalExecutionById(executionInOne.getId());
          
           String processInstanceId = processInstance.getId();
          
           List<Job> jobs = managementService.createJobQuery()
           .processInstanceId(processInstanceId)
           .list();
          
           assertEquals("Job size doesn't equal 2",2, jobs.size());
          
           Job job = jobs.get(0);
          
           managementService.executeJob(job.getId());
          
           job = jobs.get(1);
          
           managementService.executeJob(job.getId());
          
           processInstance = executionService.findProcessInstanceById(processInstanceId);
          
           Execution executionInFour = processInstance.findActiveExecutionIn("custom four");
           assertNotNull("ExecutionInFour Is Null",executionInFour);
           processInstance = executionService.signalExecutionById(executionInFour.getId());
          
          
           if(executionService.findProcessInstanceById(processInstanceId) != null) {
           assertFalse("ProcessInstance.isEnded() is not false",processInstance.isEnded());
           }else {
           assertNull("processInstanceID not null",executionService.findProcessInstanceById(processInstanceId));
           }
          
           }
          
           public static class CustomOne implements ExternalActivityBehaviour {
           private static final long serialVersionUID = 1L;
          
           public void execute(ActivityExecution execution) throws Exception {
           System.out.println("Executing");
          
           System.out.println(execution.getActivityName());
          
           execution.waitForSignal();
           }
          
           public void signal(ActivityExecution execution,
           String signalName,
           Map<String, ?> parameters) {
           execution.take(signalName);
           }
           }
          
           public static class CustomTwo implements ActivityBehaviour {
           private static final long serialVersionUID = 1L;
          
           public void execute(ActivityExecution execution) throws Exception {
           System.out.println("Executing");
          
           System.out.println(execution.getActivityName());
          
           execution.takeDefaultTransition();
           }
          
           public void signal(ActivityExecution execution,
           String signalName,
           Map<String, ?> parameters) {
           execution.take(signalName);
           }
           }
          
           public static class CustomThree implements ActivityBehaviour {
           private static final long serialVersionUID = 1L;
          
           public void execute(ActivityExecution execution) throws Exception {
           System.out.println("Executing");
          
           System.out.println(execution.getActivityName());
          
           execution.takeDefaultTransition();
           }
          
           public void signal(ActivityExecution execution,
           String signalName,
           Map<String, ?> parameters) {
           execution.take(signalName);
           }
           }
          
           public static class CustomFour implements ExternalActivityBehaviour {
           private static final long serialVersionUID = 1L;
          
           public void execute(ActivityExecution execution) throws Exception {
           System.out.println("Executing");
          
           System.out.println(execution.getActivityName());
          
           execution.waitForSignal();
           }
          
           public void signal(ActivityExecution execution,
           String signalName,
           Map<String, ?> parameters) {
           execution.take(signalName);
           }
           }
          
          }
          
          


          Workflow
          <?xml version="1.0" encoding="UTF-8"?>
          
          <process name="process" xmlns="http://jbpm.org/4.0/jpdl">
           <start>
           <transition to="custom one"/>
           </start>
          
           <custom class="org.workflow.test.forum.ProcessTest$CustomOne" name="custom one">
           <transition to="fork"/>
           </custom>
          
           <fork name="fork">
           <transition name="custom two" to="custom two"/>
           <transition name="custom three" to="custom three"/>
           </fork>
          
           <custom continue="async" name="custom two" class="org.workflow.test.forum.ProcessTest$CustomTwo">
           <transition to="join"/>
           </custom>
          
           <custom continue="async" name="custom three" class="org.workflow.test.forum.ProcessTest$CustomThree">
           <transition to="join"/>
           </custom>
          
           <join name="join" continue="exclusive" >
           <transition name="custom four" to="custom four"/>
           </join>
          
           <custom name="custom four" class="org.workflow.test.forum.ProcessTest$CustomFour">
           <transition to="end"/>
           </custom>
          
           <task name="end">
           <transition name="to complete" to="end process"/>
           </task>
          
           <end name="end process" state="complete"/>
          
           </process>
          


          Sometimes this runs to completion and sometimes it fails. Its very similiar to the issue I had when the unit test thread was completing ahead of the workflow. I suspect putting a small sleep in there will fix this but this is what I (we) are trying to fix. The join should ensure is only continues once all have joined from the fork.

          • 32. Re: workflow design about wait states
            jbarrez

            The issue here is that you have multiple threads, the JUnit thread and the JobExecutor thread(s).

            The process is started in the JUnit thread until the custom activities in the fork are encountered. Jobs are created and put in the database.

            These jobs will be picked up by the JobExecutor threads somewhere in the future. However, the JUnit thread is still running and reaches the end of the method call. The JUnit framework will now kill its ThreadRunner, which also kills all spawned processes (Jobexecutor threads).

            The join activity can never ensure here that all threads have finished, for the simple reason that the join is potentially never even reached (if the JUnit thread finished quickly).

            If you want to control your unit test, you must disable the job executor in your config and fire the jobs yourself.

            • 33. Re: workflow design about wait states
              mmusaji

              Thanks for that. That explains things perfectly :)

              So I can assume, when I deploy this in Jboss (i.e. not running from a junit test) then I won't have this problem as the Join will then ensure all activities are present before carrying on? As long as I have the waitForSignal in my classes as expected?

              I think with this approach things are very much clearer. Apologies for letting this run for 4 pages :) But thanks for your help and explanations to understand this. Not just have to find some time for that blog ;)

              • 34. Re: workflow design about wait states
                kukeltje

                Stupid me, I already mentioned firing the jobs manually (see the test), just forgot to disable the jobexecutor....AAAAAAAAAAHHHHHHHHHHHHHH

                In your real process it is not needed to have the waitForSignal, if you do not want them to explicitly wait for a signal. The reason is that the JobExecutor is started then in a different way and not ended like it is when the unittest ends.

                So basically your test is correct, just disable the jobexecutor in the config.

                • 35. Re: workflow design about wait states
                  mmusaji

                  I'd just like to confirm a few things.

                  In my application my classes don't don't have to call waitForSignal and the join will still wait for all forks to come together before carrying on?

                  And would I be correct in saying that it is therefore not possible to test async nodes in a fork realistically? i.e. There is no way to start two nodes at the same time in a unit test?

                  • 36. Re: workflow design about wait states
                    kukeltje

                    1: Correct

                    2: No, not necessarily. Yes, with manually firing the jobs and doing nothing in addtion you are right. But... You could always do one of two things if you really think it is important (keep in mind that a fork is 'parallel on the BUSINESS level', not by definition multi-threaded (although sometimes they are processed that way)
                    - Start a new threads where you manually fire the jobs
                    - Switch back to the jobExecutor in these case (preferred) with multiple threads (5 is the default afaik)

                    But as you can read from Jorams post (and what I forgot to explain to you) is that in addition to this you have to make sure the unittest thread is not ended before both jobs are finished and the join is reached. This can be done by one long Thread.sleep(..), or by having a loop which tests if the execution is in a specific state or if it is ended and if not, sleep a short time and test again.

                    • 37. Re: workflow design about wait states
                      kukeltje

                      Personal remark:

                      Executing the nodes in a fork one after another did not pose any problem to me so I just do it this way, without the sleeps (basically just like your test is now)

                      • 38. Re: workflow design about wait states
                        mmusaji

                        Perhaps I'm missing something... This is an example of how my application will start of the workflow. My comments in the code explain what I need to happen. I'm using the same process definition previously posted.

                        I need the workflow to start and then this method to wait until a signal is recieved from the workflow before reading the variable back out the executionService.

                        @WebMethod
                         public String request() {
                         try {
                         InitialContext ctx = new InitialContext();
                         this.processEngine = (ProcessEngine)ctx.lookup("java:/ProcessEngine");
                        
                         ExecutionService execService = (ExecutionService)
                         this.processEngine.get(ExecutionService.class);
                        
                         //kicks off workflow (1)
                         ProcessInstance processInstance = execService.startProcessInstanceByKey("process", variables);
                        
                         //between above the line and this line, we need a wait of some sort but depending on how long
                         //the workflow takes to complete...
                        
                         //should not be read until the workflow is complete
                         myObject = (MyObject)execService.getVariable(processInstance.getId(), "myObjectDetails");
                        
                        
                         result = "My Object Details: " + myObject.getDetails();
                        
                         } catch (Exception e) {
                         e.printStackTrace();
                         }
                        
                         return result;
                         }
                        




                        • 39. Re: workflow design about wait states
                          kukeltje

                          So you want a front-end thread to be blocking for (mostly) parallel executed jobs in the backend? Ar you sure? How long will these jobs be running? Is the process fully ended after these jobs like in your definition that was posted?

                          So basically http://www.eaipatterns.com/BroadcastAggregate.html is what you want?

                          I'd look into using some kind of ajax polling to check if they are finished.

                          And I assume there are multiple return variables since having multiple parallel jobs working on the same variable is kind of dangerous.

                          • 40. Re: workflow design about wait states
                            kukeltje

                            Shoot, forgot my last 'paste' of a link:

                            See http://www.infoq.com/articles/boris-lublinsky-jboss-jbpm for a good explanation/example/... (although it is jBPM 3) but the idea is the same.

                            • 41. Re: workflow design about wait states
                              mmusaji

                               

                              "kukeltje" wrote:
                              So you want a front-end thread to be blocking for (mostly) parallel executed jobs in the backend? Ar you sure? How long will these jobs be running? Is the process fully ended after these jobs like in your definition that was posted?

                              So basically http://www.eaipatterns.com/BroadcastAggregate.html is what you want?


                              The jobs will be running for different periods of time but anywhere between 2 and 8 seconds. But there could be problems with comms etc and it could take longer or we could retry etc so the job would take longer in them circumstances. And yes... unfortunately we are sure! lol.

                              But yes that pattern looks spot on. Is there an "Aggregator" of sorts in JBPM? Or would that be the Join node in a way.

                              "kukeltje" wrote:

                              And I assume there are multiple return variables since having multiple parallel jobs working on the same variable is kind of dangerous.


                              Yes multiple return variables (each fork changing its "own" object) which is then read back at the front-end once its been updated.

                              • 42. Re: workflow design about wait states
                                kukeltje

                                The aggregator is the join, but the 'concept' is non-blocking in the technical level.

                                The thing is that when you start a process, it continues to the first node that is a waitstate and returns the control to the user then. In normal cases this is the jBPM fork. In your cases you'd want that to be the join, or even the 'leaving the join (e.g. the end node)

                                In general it is an interesting question, although we should not try to duplicate things that are already in JBoss ESB. Since afaik a blocking scatterer/gatherer is available in the ESB. Implementing a custom join (look at the source of the JoinActivity.java (http://fisheye.jboss.org/browse/~raw,r=5490/JbpmSvn/jbpm4/trunk/modules/jpdl/src/main/java/org/jbpm/jpdl/internal/activity/JoinActivity.java) with e.g. a Thread.sleep(..) and removing the 'waitForSignal()' would make it blocking but could have other implications.

                                So in the end, I think you implementing a Thread.sleep(..) for now is best I think. But do not forget to e.g. implement timer like in the link I gave you.

                                For this specific use-case, jBPM does not provide what you need out of the box and in the end you have to do some similar things like you would with old-school multi-threading or using jms for this (http://www.ibm.com/developerworks/websphere/library/techarticles/0802_nasser/0802_nasser.html)

                                • 43. Re: workflow design about wait states
                                  mmusaji

                                  Thanks. Guess I'll look in to some other ways of doing this if possible. Might have to rethink things I feel.

                                  Thanks for the links though because they are useful to read up and gain some more knowledge of all these things.

                                  1 2 3 Previous Next