Token locked and StaleObject issues on ESB Callback to JBPM
chuaky Apr 7, 2011 5:27 AMhi all,
Got a query with ESB callback to JBPM. Need help to figure out how to do it.
I am using ESB 4.9 on AS 4.2.3.
Here is the scenario:
1. From ESB, a JBPM process is started and wait on a state. I am making a process with few states.
2. An external signal arrives into esb and I signal back (callback) to this jbpm process to continue the bpm process.
3. If many external signals arrives too fast, we would see token locked or staleobject issues.
My query is how to avoid token locked or staleobject issues when the jbpm process is callback?
Thank you.
The coding for the jbpm callback is as follows:
==========
package test.service.jbpm;
import org.apache.log4j.Logger;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.actions.AbstractActionPipelineProcessor;
import org.jboss.soa.esb.actions.ActionProcessingException;
import org.jboss.soa.esb.message.*;
import org.jbpm.graph.def.ProcessDefinition;
import org.jbpm.graph.exe.ProcessInstance;
import org.jbpm.graph.exe.Token;
import org.jbpm.db.GraphSession;
import org.jbpm.JbpmConfiguration;
import org.jbpm.JbpmContext;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.PortReference;
import org.jboss.soa.esb.services.jbpm.Constants;
import org.jboss.soa.esb.services.jbpm.Mapping;
import org.jboss.soa.esb.services.jbpm.JBpmObjectMapper;
import org.jboss.soa.esb.services.jbpm.cmd.*;
import java.util.Map;
public class SetupJbpmProcessCB extends AbstractActionPipelineProcessor {
private Logger log = Logger.getLogger(this.getClass());
private ConfigTree configTree;
public SetupJbpmProcessCB(ConfigTree _configTree) {
configTree = _configTree;
}
public Message process(Message message) throws ActionProcessingException {
log.info("*********** BEGIN SETUP JBPM PROCESS ***********");
JbpmBusinessKey jbpmBusinessKey = (JbpmBusinessKey) message.getBody().get("BusinessKey");
if (!jbpmBusinessKey.isValidKey()) {
message = null;
log.info(" invalid key");
return message;
}
String configProcessDefinitionName = configTree.getAttribute("process-definition-name");
JbpmConfiguration jbpmConf = JbpmConfiguration.getInstance();
// Open the transaction.
JbpmContext jbpmCtx = jbpmConf.createJbpmContext();
try {
GraphSession graphSession = jbpmCtx.getGraphSession();
ProcessDefinition processDefinition = graphSession.findLatestProcessDefinition(configProcessDefinitionName);
ProcessInstance instance = graphSession.getProcessInstance(processDefinition, jbpmBusinessKey.getKey());
if (instance != null) {
Token token = instance.getRootToken();
final EPR epr = new EPR();
final PortReference portRef = epr.getAddr();
final long nodeId = token.getNode().getId();
final long tokenId = token.getId();
final long processVersion = instance.getProcessDefinition().getVersion();
log.info("token locked = " + token.isLocked());
portRef.addExtension(Constants.NODE_ID, String.valueOf(nodeId));
portRef.addExtension(Constants.TOKEN_ID, String.valueOf(tokenId));
portRef.addExtension(Constants.PROCESS_INSTANCE_ID, String.valueOf(instance.getId()));
// Set the counter
String counterName = Constants.PROCESS_NODE_VERSION_COUNTER + nodeId + '_' + tokenId;
portRef.addExtension(counterName, String.valueOf(processVersion));
// Get esbToBpmVars
String esbToBpmVars = "<property name=\"esbToBpmVars\"> ";
for (ConfigTree current : configTree.getChildren("mapping")) {
esbToBpmVars += current.toString();
esbToBpmVars += " ";
}
esbToBpmVars += "</property> ";
portRef.addExtension(Constants.ESB_TO_BPM_VARS_TAG, esbToBpmVars);
//log.info("children =" + esbToBpmVars);
//Obtaining the VariableMap that is going to be set callback command
JBpmObjectMapper mapper = new JBpmObjectMapper();
Map<String, Object> variableMap = mapper.mapFromEsbMessageToJBpmMap(message, esbToBpmVars);
if (null != variableMap) {
message.getBody().add(Constants.VARIABLE_VALUES, variableMap);
}
//log.info("esbToBpmVars = " + variableMap);
// the above for setting esbToBpm variables into message body wouldn't work because
// the processing in CallbackCommand.java need it to be called from EsbActionHandler
// our case is not called by EsbActionHandler.
//add the esbToBpmVars to jbpm (this one can set the variables correctly)
for (Map.Entry<String, Object> entry : variableMap.entrySet()) {
instance.getContextInstance().setVariableLocally(entry.getKey(), entry.getValue());
}
// The counterName variable is expected to be found in the process context.
instance.getContextInstance().setVariableLocally(counterName, processVersion);
message.getHeader().getCall().setTo(epr);
// test callback
CallbackCommand callback = new CallbackCommand();
callback.setCallbackEpr(epr);
callback.setMessage(message);
callback.execute(jbpmCtx);
}
} catch (Exception e) {
log.error(e);
} finally {
// Close context.
jbpmCtx.close();
}
log.info("************ END SETUP JBPM PROCESS ************");
return message;
}
}