1 Reply Latest reply on Apr 8, 2011 10:47 AM by Chua Khoon Yong

    Token locked and StaleObject issues on ESB Callback to JBPM

    Chua Khoon Yong Novice

      hi all,

       

      Got a query with ESB callback to JBPM.  Need help to figure out how to do it.

       

      I am using ESB 4.9 on AS 4.2.3.

       

      Here is the scenario:

       

      1.  From ESB, a JBPM process is started and wait on a state.  I am making a process with few states.

       

      2.  An external signal arrives into esb and I signal back (callback) to this jbpm process to continue the bpm process.

       

      3.  If many external signals arrives too fast, we would see token locked or staleobject issues.

       

      My query is how to avoid token locked or staleobject issues when the jbpm process is callback?

      Thank you.

       

      The coding for the jbpm callback is as follows:

      ==========

       

      package test.service.jbpm;

       

      import org.apache.log4j.Logger;

      import org.jboss.soa.esb.helpers.ConfigTree;

      import org.jboss.soa.esb.actions.AbstractActionPipelineProcessor;

      import org.jboss.soa.esb.actions.ActionProcessingException;

      import org.jboss.soa.esb.message.*;

      import org.jbpm.graph.def.ProcessDefinition;

      import org.jbpm.graph.exe.ProcessInstance;

      import org.jbpm.graph.exe.Token;

      import org.jbpm.db.GraphSession;

      import org.jbpm.JbpmConfiguration;

      import org.jbpm.JbpmContext;

      import org.jboss.soa.esb.addressing.EPR;

      import org.jboss.soa.esb.addressing.PortReference;

      import org.jboss.soa.esb.services.jbpm.Constants;

      import org.jboss.soa.esb.services.jbpm.Mapping;

      import org.jboss.soa.esb.services.jbpm.JBpmObjectMapper;

      import org.jboss.soa.esb.services.jbpm.cmd.*;

      import java.util.Map;

       

      public class SetupJbpmProcessCB extends AbstractActionPipelineProcessor {

       

          private Logger log = Logger.getLogger(this.getClass());

          private ConfigTree configTree;

       

          public SetupJbpmProcessCB(ConfigTree _configTree) {

              configTree = _configTree;

          }

       

          public Message process(Message message) throws ActionProcessingException {

              log.info("*********** BEGIN SETUP JBPM PROCESS ***********");

       

              JbpmBusinessKey jbpmBusinessKey = (JbpmBusinessKey) message.getBody().get("BusinessKey");

       

              if (!jbpmBusinessKey.isValidKey()) {

                  message = null;

                  log.info("  invalid key");

                  return message;

              }

       

              String configProcessDefinitionName = configTree.getAttribute("process-definition-name");

       

              JbpmConfiguration jbpmConf = JbpmConfiguration.getInstance();

       

      // Open the transaction.

              JbpmContext jbpmCtx = jbpmConf.createJbpmContext();

       

              try {

       

                  GraphSession graphSession = jbpmCtx.getGraphSession();

       

                  ProcessDefinition processDefinition = graphSession.findLatestProcessDefinition(configProcessDefinitionName);

                  ProcessInstance instance = graphSession.getProcessInstance(processDefinition, jbpmBusinessKey.getKey());

       

                  if (instance != null) {

       

                      Token token = instance.getRootToken();

       

                      final EPR epr = new EPR();

                      final PortReference portRef = epr.getAddr();

                      final long nodeId = token.getNode().getId();

                      final long tokenId = token.getId();

                      final long processVersion = instance.getProcessDefinition().getVersion();

       

                      log.info("token locked = " + token.isLocked());

       

                      portRef.addExtension(Constants.NODE_ID, String.valueOf(nodeId));

                      portRef.addExtension(Constants.TOKEN_ID, String.valueOf(tokenId));

                      portRef.addExtension(Constants.PROCESS_INSTANCE_ID, String.valueOf(instance.getId()));

       

                      // Set the counter

                      String counterName = Constants.PROCESS_NODE_VERSION_COUNTER + nodeId + '_' + tokenId;

                      portRef.addExtension(counterName, String.valueOf(processVersion));

       

                      // Get esbToBpmVars

                      String esbToBpmVars = "<property name=\"esbToBpmVars\"> ";

                      for (ConfigTree current : configTree.getChildren("mapping")) {

                          esbToBpmVars += current.toString();

                          esbToBpmVars += " ";

                      }

                      esbToBpmVars += "</property> ";

       

                      portRef.addExtension(Constants.ESB_TO_BPM_VARS_TAG, esbToBpmVars);

                      //log.info("children =" + esbToBpmVars);

       

                      //Obtaining the VariableMap that is going to be set callback command

                      JBpmObjectMapper mapper = new JBpmObjectMapper();

                      Map<String, Object> variableMap = mapper.mapFromEsbMessageToJBpmMap(message, esbToBpmVars);

                      if (null != variableMap) {

                          message.getBody().add(Constants.VARIABLE_VALUES, variableMap);

                      }

                      //log.info("esbToBpmVars = " + variableMap);

       

                      // the above for setting esbToBpm variables into message body wouldn't work because

                      // the processing in CallbackCommand.java need it to be called from EsbActionHandler

                      // our case is not called by EsbActionHandler.

       

                      //add the esbToBpmVars to jbpm (this one can set the variables correctly)

                      for (Map.Entry<String, Object> entry : variableMap.entrySet()) {

                          instance.getContextInstance().setVariableLocally(entry.getKey(), entry.getValue());

                      }

       

                      // The counterName variable is expected to be found in the process context.

                      instance.getContextInstance().setVariableLocally(counterName, processVersion);

       

                      message.getHeader().getCall().setTo(epr);

       

                      // test callback

                      CallbackCommand callback = new CallbackCommand();

                      callback.setCallbackEpr(epr);

                      callback.setMessage(message);

                      callback.execute(jbpmCtx);

                  }

       

              } catch (Exception e) {

                  log.error(e);

              } finally {

                  // Close context.

                  jbpmCtx.close();

              }

       

              log.info("************ END SETUP JBPM PROCESS ************");

       

              return message;

          }

      }

        • 1. Token locked and StaleObject issues on ESB Callback to JBPM
          Chua Khoon Yong Novice

          hi,

           

          A temporary workaround that i can think of is as follows.  It is not elegant but works to some extent

           

           

          1.  Make the process method synchronized

           

              public synchronized Message process(Message message) throws ActionProcessingException {

           

           

          2.  Take a good rest with Thread.sleep(500).  Adjust the time delay based on your preferences.

           

                  try {

                      // Take a rest, lol

                      Thread.sleep(500);

                      GraphSession graphSession = jbpmCtx.getGraphSession();

           

           

          The above way is quick and dirty to avoid the issues, however performance wise is not so good because of the sleep time delay.  The sleep time is introduce for the jbpm process to finish its current transition, hopefully.  A better way is to synchronise the access to the specific jbpm process from point of signal till it finish the transition, however i don't know how to do that now.

           

          Hope it help someone out there.

          Cheers.