Is the simple fork/join with async execution paths broken in jBPM 4.0
manjuss Apr 6, 2010 12:45 AMhi All,
We are exploring the simple fork/join construct for an usecase where in the execution path of each of the fork's branch has to be asynchronous. I have provided all the necessary details below, please do let me know if i am missing some thing very fundamental. This seems like a very fundamental usecase and may be we are missing just some very fundamental, please do let us know.
Issue faced:
The fork node starts the execution of both the branches in parallel by using inbuilt jBPM messaging. We have observed that the job executor picks up both the jobs and starts executing them concurrently. However the workflow never enters the join state.
Environment:
OS: Linux/Windows
Java: 1.5.0_22
jBPM: latest stable 4.0 release from sourceforge.
Executed as a simple JUnit testcase.
Problem definition:
We have a bunch of very long running processes that we want to execute in parallel. Once all these parallel processes are done we would like to continue with the workflow. We are trying to model this using fork/join and async continuation.
We are trying to execute the test cases provided in jBPM 4.0 with some minor modifications. For the prototype we are simulating long running processes as simple java nodes. The workflow definition is as follows:
process.jpdl.xml:
<?xml version="1.0" encoding="UTF-8"?>
<process name="AsyncFork" xmlns="http://jbpm.org/4.0/jpdl">
<start g="22,69,80,40">
<transition to="fork1" />
</start>
<fork g="99,68,80,40" name="fork1">
<transition g="122,41:" to="ship goods" />
<transition g="123,142:" to="send bill" />
</fork>
<java class="org.jbpm.examples.async.fork.Application" g="159,17,98,50"
method="shipGoods" name="ship goods" continue="async">
<transition g="297,42:" to="join" />
</java>
<java class="org.jbpm.examples.async.fork.Application" g="159,117,98,50"
method="sendBill" name="send bill" continue="async">
<transition g="297,141:" to="join" />
</java>
<join g="274,66,80,40" name="join">
<transition to="print join" />
</join>
<java g="159,117,98,50"
method="printjoin" name="print join">
<transition g="297,141:" to="end" />
</java>
<end g="353,67,48,48" name="end" />
</process>
Application.java: (Note: We have just modified the given java file)
package org.jbpm.examples.async.fork;
import java.io.Serializable;
/**
* @author Tom Baeyens
*/
public class Application implements Serializable {
private static final long serialVersionUID = 1L;
public void shipGoods() {
// assume automatic calculations here
for (int i = 10; i > 0; i--) {
System.out.println("Shipping goods");
sleep();
}
}
public void printjoin() {
System.out.println("Joining both the forked execution paths");
}
private void sleep() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void sendBill() {
// assume automatic calculations here
for (int i = 10; i > 0; i--) {
System.out.println("New:Sending bill");
sleep();
}
}
}
AsyncForkTest.java:(Note: We have just modified the given java file)
package org.jbpm.examples.async.fork;
import java.util.Date;
import java.util.List;
import org.jbpm.api.Execution;
import org.jbpm.api.ProcessInstance;
import org.jbpm.api.job.Job;
import org.jbpm.test.JbpmTestCase;
/**
* @author Tom Baeyens
*/
public class AsyncForkTest extends JbpmTestCase {
String deploymentId;
protected void setUp() throws Exception {
super.setUp();
deploymentId = repositoryService.createDeployment()
.addResourceFromClasspath("org/jbpm/examples/async/fork/process.jpdl.xml")
.deploy();
}
protected void tearDown() throws Exception {
// repositoryService.deleteDeploymentCascade(deploymentId);
// super.tearDown();
}
public void testAsyncFork() {
ProcessInstance processInstance = executionService.startProcessInstanceByKey("AsyncFork");
String processInstanceId = processInstance.getId();
System.out.println("processInstanceId: " + processInstanceId);
while (true) {
}
// List<Job> jobs = managementService.createJobQuery()
// .processInstanceId(processInstanceId)
// .list();
//
// assertEquals(2, jobs.size());
//
// Job job = jobs.get(0);
//
// managementService.executeJob(job.getId());
//
// job = jobs.get(1);
//
// managementService.executeJob(job.getId());
//
// Date endTime = historyService
// .createHistoryProcessInstanceQuery()
// .processInstanceId(processInstance.getId())
// .uniqueResult()
// .getEndTime();
//
// assertNotNull(endTime);
}
}
jbpm.cfg.xml: We have enabled jobexecutor.
<?xml version="1.0" encoding="UTF-8"?>
<jbpm-configuration>
<import resource="jbpm.default.cfg.xml" />
<import resource="jbpm.tx.hibernate.cfg.xml" />
<import resource="jbpm.jpdl.cfg.xml" />
<import resource="jbpm.identity.cfg.xml" />
<!-- Job executor is excluded for running the example test cases. -->
<!-- To enable timers and messages in production use, this should be included. -->
<import resource="jbpm.jobexecutor.cfg.xml" />
<import resource="jbpm.mail.templates.examples.xml" />
</jbpm-configuration>
Our Observations and Analysis:
- 2 jobs gets created in the JBPM4_JOB table one each for the async execution path
- Job executor picks up both these jobs concurrently and starts executing the java action
- With debugger on what i have noticed is that each of the job executor threads loads up the ExecutionImpl object into the hibernate session at the start of the execution and rarely hits the DB back for obvious performance reasons.
- ExecutionImpl object is loaded at the start of thread execution, any of the state changes made by other job executor thread is not visible in the current thread unless the hibernate object is reloaded and this never happens as its a separate hibernate session and thread of execution.
- Each of the execution paths tries to execute the JoinActivity
- In the JoinActivity there is a check to see if all the branches have completed - this test fails always, hence workflow is in an indefinite wait state kind of a thing.
- Join is not executed.
Thanks
Manju
-
Application.java.zip 1.0 KB
-
process.jpdl.xml 972 bytes
-
AsyncForkTest.java.zip 1.3 KB
-
jbpm.cfg.xml 533 bytes