8 Replies Latest reply on Aug 17, 2007 3:42 PM by kukeltje

    Actions and async=

    jeffj55374

      org.jbpm.JbpmException: token '18547' can't be locked by 'job[18548]' cause it's already locked by 'token[18547]'
      Full stack trace, process definition, action handler are all below.

      Environment

      jBPM 3.2.1
      Standalone Java 1.6 (no appserver)
      Actions and async="true" don't mix.
      JobExecutor is configured for one thread.
      Hibernate 3.2.4.sp1
      Oracle 10g



      Summary:
      1. Token is signaled to start the process.
      2. Token transitions to node1 from the start node.
      3. Node one has an action handler defined so Node.execute(...) calls GraphElement.executeAction() which locks the token.
      4. The action handler calls ctx.leaveNode after it does its work. (Section 9.5 of the user guide states: "Note the difference
      between an action that is placed in an event versus an action that is placed in a node. Actions that are put in an event
      are executed when the event fires. Actions on events have no way to influence the flow of control of the process.
      It is similar to the observer pattern. On the other hand, an action that is put on a node has the responsibility of propagating the execution.
      5. ctx.leaveNode on Node1 ultimately results in Node.enter() being called on Node2
      6. Since Node2 is an async node, rather than executing the node, the code prepares for an async continuation.
      7. After sending the message the code attempts to lock the token, but it already locked as a result of #3
      // execute the node
      if (isAsync) {
      ExecuteNodeJob job = createAsyncContinuationJob(token);
      MessageService messageService = (MessageService) Services.getCurrentService(Services.SERVICENAME_MESSAGE);
      messageService.send(job);
      token.lock(job.toString()); // ** THIS IS THE PROBLEM LINE!!!! ***
      } else {
      execute(executionContext);
      }

      The conflict:
      Javadoc for Token.lock():
      /**
      * locks a process instance for further execution. A locked token
      * cannot continue execution. This is a non-persistent
      * operation. This is used to prevent tokens being propagated during
      * the execution of actions.
      * @see #unlock(String)
      */
      public void lock(String lockOwnerId) {

      But Section 9.5 of the user's guide says that an action is responsible for propagating the execution, but the token is locked in an attempt to propagate it.

      I'm currently trying to understand the design and come up with a solution to allow my process to run as designed.
      What I don't understand yet is why the token is used to prevent propagation or why the code in Node.execute is trying to lock the token with the job id.
      Should the code unlock the token and then lock it with the job? Is the token lock really necessary?
      If I remove the action fron Node 1, things work fine.

      Possibly related to http://jira.jboss.com/jira/browse/JBPM-983. Similar, but a little different twist

      Any ideas?

      Once I research this a little more, I might submit a bug, but I'd like a little feedback to verify that I'm on the right track.

      Thanks for your help.

      Full stack trace:


      org.jbpm.JbpmException: token '18547' can't be locked by 'job[18548]' cause it's already locked by 'token[18547]'
      at org.jbpm.graph.exe.Token.lock(Token.java:646)
      at org.jbpm.graph.def.Node.enter(Node.java:316)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
      at java.lang.reflect.Method.invoke(Unknown Source)
      at org.hibernate.proxy.pojo.cglib.CGLIBLazyInitializer.invoke(CGLIBLazyInitializer.java:157)
      at org.jbpm.graph.def.Node$$EnhancerByCGLIB$$dc89a302.enter()
      at org.jbpm.graph.def.Transition.take(Transition.java:151)
      at org.jbpm.graph.def.Node.leave(Node.java:393)
      at org.jbpm.graph.def.Node.leave(Node.java:357)
      at org.jbpm.graph.exe.ExecutionContext.leaveNode(ExecutionContext.java:120)
      at com.digitalriver.mds.jbpm.SampleActionHandler.execute(SampleActionHandler.java:46)
      at org.jbpm.graph.def.Action.execute(Action.java:122)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
      at java.lang.reflect.Method.invoke(Unknown Source)
      at org.hibernate.proxy.pojo.cglib.CGLIBLazyInitializer.invoke(CGLIBLazyInitializer.java:157)
      at org.jbpm.graph.def.Action$$EnhancerByCGLIB$$5863de36.execute()
      at org.jbpm.graph.def.GraphElement.executeAction(GraphElement.java:255)
      at org.jbpm.graph.def.Node.execute(Node.java:338)
      at org.jbpm.graph.def.Node.enter(Node.java:318)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
      at java.lang.reflect.Method.invoke(Unknown Source)
      at org.hibernate.proxy.pojo.cglib.CGLIBLazyInitializer.invoke(CGLIBLazyInitializer.java:157)
      at org.jbpm.graph.def.Node$$EnhancerByCGLIB$$dc89a302.enter()
      at org.jbpm.graph.def.Transition.take(Transition.java:151)
      at org.jbpm.graph.def.Node.leave(Node.java:393)
      at org.jbpm.graph.node.StartState.leave(StartState.java:70)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
      at java.lang.reflect.Method.invoke(Unknown Source)
      at org.hibernate.proxy.pojo.cglib.CGLIBLazyInitializer.invoke(CGLIBLazyInitializer.java:157)
      at org.jbpm.graph.def.Node$$EnhancerByCGLIB$$dc89a302.leave()
      at org.jbpm.graph.exe.Token.signal(Token.java:194)
      at org.jbpm.graph.exe.Token.signal(Token.java:139)
      at com.digitalriver.proto.StandaloneSample.runStandaloneSample(StandaloneSample.java:89)
      at com.digitalriver.proto.StandaloneSample.run(StandaloneSample.java:130)
      at com.digitalriver.proto.StandaloneSample.main(StandaloneSample.java:40)




      The Job Executor bean config

      <bean id="jbpm.job.executor" class="org.jbpm.job.executor.JbpmJobExecutor">
       <property name="jbpmConfiguration" ref="jbpmConfiguration" />
       <property name="name" value="JbpmJobExector"/>
       <property name="nbrOfThreads" value="1"/>
       <property name="idleInterval" value="5000"/>
       <property name="maxIdleInterval" value="300000"/>
       <property name="historyMaxSize" value="20"/>
       <property name="maxLockTime" value="600000"/> <!-- 10 minutes -->
       <property name="lockMonitorInterval" value="60000"/> <!-- 1 minute -->
       <property name="lockBufferTime" value="5000"/> <!-- 5 seconds -->
       </bean>



      The Process Definition

      <?xml version="1.0" encoding="UTF-8"?>
      
      <process-definition
       xmlns="" name="StandaloneSample">
       <start-state name="start">
       <transition name="" to="node1"></transition>
       </start-state>
       <end-state name="end1"></end-state>
       <node name="node1">
       <action class="com.digitalriver.mds.jbpm.SampleActionHandler"></action>
       <transition name="" to="node2"></transition>
       </node>
       <node async="true" name="node2">
       <action class="com.digitalriver.mds.jbpm.SampleActionHandler"></action>
       <transition name="" to="end1"></transition>
       </node>
      </process-definition>


      The action handler

      package com.digitalriver.mds.jbpm;
      
      /** Copyright (c) Digital River, Inc. All rights reserved. 2007
       * Filename: SampleActionHandler.java
       * Author: @author
       */
      
      
      import java.io.FileWriter;
      
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      import org.jbpm.context.exe.ContextInstance;
      import org.jbpm.graph.def.ActionHandler;
      import org.jbpm.graph.exe.ExecutionContext;
      
      /**
       * @author jpjohnson
       *
       */
      public class SampleActionHandler
       implements ActionHandler {
      
       /**
       *
       */
       private static final long serialVersionUID = 1L;
       @SuppressWarnings("unused")
       private static final Log log = LogFactory.getLog(SampleActionHandler.class);
      
       /* (non-Javadoc)
       * @see org.jbpm.graph.def.ActionHandler#execute(org.jbpm.graph.exe.ExecutionContext)
       */
       public void execute(final ExecutionContext ctx) throws Exception
       {
       log.debug("JPJ execute called for task " + ctx.getNode().getName() +
       " action " + ctx.getAction().getName());
      
       ContextInstance contextInstance = ctx.getProcessInstance().getContextInstance();
       Integer id = (Integer) contextInstance.getVariable("MyID");
      
       FileWriter file = new FileWriter("Node-" + ctx.getNode().getName() + " PI-" +
       ctx.getProcessInstance().getId() + ".txt");
       file.write("Test output: Process ID " + id);
       file.close();
      
       ctx.leaveNode();
       // ctx.getToken().signal();
       }
      
      }
      




        • 1. Re: Actions and async=
          kukeltje

          Search the forum and the jira first.... LOTS of info on this subject... aamof, it is high on the list to be investigated by the jbpm team (in fact it already currently is)

          • 2. Re: Actions and async=
            kukeltje

            hmm... sorry... saw you mentioned a link to the jira...

            Better to report that in the beginning of your posts... So many posts are duplicates and long that I (and many others) do not read them fully.

            • 3. Re: Actions and async=
              jeffj55374

              The details of my particular failure seem to be different than what is already documented in Jira.
              I have submitted http://jira.jboss.com/jira/browse/JBPM-1042

              Here is a failing unit test.
              This was created with jBPM 3.2.1

              While the details of this test are different, I believe the problem may be related to things documented in:
              http://jira.jboss.com/jira/browse/JBPM-633
              http://jira.jboss.com/jira/browse/JBPM-983
              http://jira.jboss.com/jira/browse/JBPM-626

              package org.jbpm.job.executor;
              
              import java.util.Collections;
              import java.util.Set;
              import java.util.Timer;
              import java.util.TimerTask;
              import java.util.TreeSet;
              
              import junit.framework.TestCase;
              
              import org.apache.commons.logging.Log;
              import org.apache.commons.logging.LogFactory;
              import org.hibernate.Session;
              import org.jbpm.JbpmConfiguration;
              import org.jbpm.JbpmContext;
              import org.jbpm.graph.def.ActionHandler;
              import org.jbpm.graph.def.ProcessDefinition;
              import org.jbpm.graph.exe.ExecutionContext;
              import org.jbpm.graph.exe.ProcessInstance;
              
              
              /** This test verifies that a Node with an action handler can propagate execution to the next node
               * that is an async="true" node that also has an action handler.
               *
               * An instance of the process below will run successfully if the action handler is removed from Node1,
               * or Node2 is not async="true". But the combination with Node1
               * having and action handler that propagates the execution to the next node that is async="true"
               * fails with the exception described below.
               *
               * @author jpjohnson@digitalriver.com
               *
               * Originally this test case failed in jBPM 3.2.1 with the following exception
               * org.jbpm.JbpmException: token '1' can't be locked by 'job[1]' cause it's already locked by 'token[1]'
               at org.jbpm.graph.exe.Token.lock(Token.java:646)
               at org.jbpm.graph.def.Node.enter(Node.java:316)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
               at java.lang.reflect.Method.invoke(Unknown Source)
               at org.hibernate.proxy.pojo.cglib.CGLIBLazyInitializer.invoke(CGLIBLazyInitializer.java:157)
               at org.jbpm.graph.def.Node$$EnhancerByCGLIB$$cc789161.enter(<generated>)
               at org.jbpm.graph.def.Transition.take(Transition.java:151)
               at org.jbpm.graph.def.Node.leave(Node.java:393)
               at org.jbpm.graph.def.Node.leave(Node.java:357)
               at org.jbpm.graph.exe.ExecutionContext.leaveNode(ExecutionContext.java:120)
               at org.jbpm.job.executor.SimpleAsyncProcessTest$AsyncAction.execute(SimpleAsyncProcessTest.java:57)
               at org.jbpm.graph.def.Action.execute(Action.java:122)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
               at java.lang.reflect.Method.invoke(Unknown Source)
               at org.hibernate.proxy.pojo.cglib.CGLIBLazyInitializer.invoke(CGLIBLazyInitializer.java:157)
               at org.jbpm.graph.def.Action$$EnhancerByCGLIB$$4852cc95.execute(<generated>)
               at org.jbpm.graph.def.GraphElement.executeAction(GraphElement.java:255)
               at org.jbpm.graph.def.Node.execute(Node.java:338)
               at org.jbpm.graph.def.Node.enter(Node.java:318)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
               at java.lang.reflect.Method.invoke(Unknown Source)
               at org.hibernate.proxy.pojo.cglib.CGLIBLazyInitializer.invoke(CGLIBLazyInitializer.java:157)
               at org.jbpm.graph.def.Node$$EnhancerByCGLIB$$cc789161.enter(<generated>)
               at org.jbpm.graph.def.Transition.take(Transition.java:151)
               at org.jbpm.graph.def.Node.leave(Node.java:393)
               at org.jbpm.graph.node.StartState.leave(StartState.java:70)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
               at java.lang.reflect.Method.invoke(Unknown Source)
               at org.hibernate.proxy.pojo.cglib.CGLIBLazyInitializer.invoke(CGLIBLazyInitializer.java:157)
               at org.jbpm.graph.def.Node$$EnhancerByCGLIB$$cc789161.leave(<generated>)
               at org.jbpm.graph.exe.Token.signal(Token.java:194)
               at org.jbpm.graph.exe.Token.signal(Token.java:139)
               at org.jbpm.graph.exe.ProcessInstance.signal(ProcessInstance.java:270)
               at org.jbpm.job.executor.SimpleAsyncProcessTest.launchProcess(SimpleAsyncProcessTest.java:112)
               at org.jbpm.job.executor.SimpleAsyncProcessTest.testConsecutiveAycnActionHandlers(SimpleAsyncProcessTest.java:69)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
               at java.lang.reflect.Method.invoke(Unknown Source)
               at junit.framework.TestCase.runTest(TestCase.java:154)
               at junit.framework.TestCase.runBare(TestCase.java:127)
               at junit.framework.TestResult$1.protect(TestResult.java:106)
               at junit.framework.TestResult.runProtected(TestResult.java:124)
               at junit.framework.TestResult.run(TestResult.java:109)
               at junit.framework.TestCase.run(TestCase.java:118)
               at junit.framework.TestSuite.runTest(TestSuite.java:208)
               at junit.framework.TestSuite.run(TestSuite.java:203)
               at org.eclipse.jdt.internal.junit.runner.junit3.JUnit3TestReference.run(JUnit3TestReference.java:130)
               at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
               at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:460)
               at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:673)
               at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:386)
               at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:196)
              
              
               */
              public class SimpleAsyncProcessTest
               extends TestCase {
              
               private static final long serialVersionUID = 1L;
              
               static int nbrOfConcurrentProcessExecutions = 20;
               static int maxWaitTime = 20000;
               static Set collectedResults = Collections
               .synchronizedSet(new TreeSet());
              
               protected static JbpmConfiguration jbpmConfiguration = JbpmConfiguration
               .getInstance("org/jbpm/jbpm.test.cfg.xml");
              
               static {
               jbpmConfiguration.getJobExecutor().nbrOfThreads = 5;
               }
              
               protected JobExecutor jobExecutor;
              
               public static class AsyncAction
               implements ActionHandler {
               private static final long serialVersionUID = 1L;
              
               public void execute(ExecutionContext executionContext) throws Exception
               {
               // Normal would do stuff here
               // Leave via the default transition
               // When this test case was written, this method would throw when
               // it tried to enter Node2 because it would try to lock the token but
               // it is already locked.
               executionContext.leaveNode();
               }
               }
               /** This test verifies that a process with two consecutive nodes with async="true"
               * and an action handler specified actually completes.
               * This test was initially written against jBPM 3.2.1. It would fail in Token.lock() when
               * attempting to propagate execution to Node2
               */
               public void testConsecutiveAycnActionHandlers()
               {
               jbpmConfiguration.createSchema();
               deployProcess();
               long processID = launchProcess();
               processJobs(maxWaitTime);
               assertTrue(hasProcessInstanceEnded(processID));
               jbpmConfiguration.createSchema();
               }
              
               public void deployProcess()
               {
               ProcessDefinition processDefinition = ProcessDefinition
               .parseXmlString("<?xml version='1.0' encoding='UTF-8'?>" +
               "<process-definition xmlns='' name='StandaloneSample'>" +
               "<start-state name='start'>" +
               "<transition name='t0' to='node1'></transition>" +
               "</start-state>" +
               "<end-state name='end1'></end-state>" +
               "<node name='node1'>" +
               "<action class='" + AsyncAction.class.getName() + "'></action>" +
               "<transition name='t1' to='node2'></transition>" +
               "</node>" +
               "<node name='node2' async='true' >"+
               "<action class='" + AsyncAction.class.getName() + "'></action>" +
               "<transition name='t2' to='end1'></transition>"+
               "</node>"+
               "</process-definition>");
              
               JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
               try {
               jbpmContext.deployProcessDefinition(processDefinition);
               }
               finally {
               jbpmContext.close();
               }
               }
               /** Create a new process instance
               *
               * @return The process instance ID
               */
               public long launchProcess()
               {
               JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
               try {
               ProcessInstance processInstance = jbpmContext
               .newProcessInstanceForUpdate("StandaloneSample");
               processInstance.signal();
               return processInstance.getId();
               }
               finally {
               jbpmContext.close();
               }
               }
              
               public boolean hasProcessInstanceEnded(long id) {
               JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
               try {
               ProcessInstance processInstance = jbpmContext.getProcessInstance(id);
               return processInstance.hasEnded();
               }
               finally {
               jbpmContext.close();
               }
              
               }
               protected void startJobExecutor()
               {
               jobExecutor = jbpmConfiguration.getJobExecutor();
               jobExecutor.start();
               }
              
               private void processAllJobs(final long maxWait)
               {
               boolean jobsAvailable = true;
              
               // install a timer that will interrupt if it takes too long
               // if that happens, it will lead to an interrupted exception and the test
               // will fail
               TimerTask interruptTask = new TimerTask() {
               Thread testThread = Thread.currentThread();
              
               public void run()
               {
               log
               .debug("test " + getName()
               + " took too long. going to interrupt...");
               testThread.interrupt();
               }
               };
               Timer timer = new Timer();
               timer.schedule(interruptTask, maxWait);
              
               try {
               while (jobsAvailable) {
               log
               .debug("going to sleep for 200 millis, waiting for the job executor to process more jobs");
               Thread.sleep(200);
               jobsAvailable = areJobsAvailable();
               }
               jobExecutor.stopAndJoin();
              
               }
               catch (InterruptedException e) {
               fail("test execution exceeded treshold of " + maxWait + " milliseconds");
               }
               finally {
               timer.cancel();
               }
               }
              
               private int getNbrOfJobsAvailable()
               {
               int nbrOfJobsAvailable = 0;
               JbpmContext jbpmContext = jbpmConfiguration.createJbpmContext();
               try {
               Session session = jbpmContext.getSession();
               Number jobs = (Number) session
               .createQuery("select count(*) from org.jbpm.job.Job").uniqueResult();
               log.debug("there are '" + jobs + "' jobs currently in the job table");
               if (jobs != null) {
               nbrOfJobsAvailable = jobs.intValue();
               }
               }
               finally {
               jbpmContext.close();
               }
               return nbrOfJobsAvailable;
               }
              
               protected boolean areJobsAvailable()
               {
               return (getNbrOfJobsAvailable() > 0);
               }
              
               protected void processJobs(long maxWait)
               {
               try {
               Thread.sleep(300);
               }
               catch (InterruptedException e) {
               e.printStackTrace();
               }
               startJobExecutor();
               try {
               processAllJobs(maxWait);
               }
               catch (Exception e) {
               e.printStackTrace();
               throw new RuntimeException(e);
               }
               finally {
               stopJobExecutor();
               }
               }
              
               protected void stopJobExecutor()
               {
               if (jobExecutor != null) {
               try {
               jobExecutor.stopAndJoin();
               }
               catch (InterruptedException e) {
               throw new RuntimeException(
               "waiting for job executor to stop and join got interrupted", e);
               }
               }
               }
              
               private static Log log = LogFactory.getLog(JobExecutorDbTest.class);
              }
              


              • 4. Re: Actions and async=

                See my comment on the JIRA. -Ed Staub

                • 5. Re: Actions and async=
                  tom.baeyens

                  can you guys please check out if the patch that I attached to http://jira.jboss.com/jira/browse/JBPM-1042 resolves the problem ?

                  • 6. Re: Actions and async=
                    jeffj55374

                    Thanks. I'll check it out this afternoon.

                    • 7. Re: Actions and async=
                      jeffj55374

                      Looks good to me! (more comments in http://jira.jboss.com/jira/browse/JBPM-1042)
                      Thanks for the quick attention and resolution!

                      • 8. Re: Actions and async=
                        kukeltje

                        Jeff,

                        hahaha.... yeah quick attention....I think your are just lucky... No, I'm sure your are just lucky. This 'problem' is months old and has gotten a lot of attention internally. You just reported one of the appearances of the problem at the moment it was already solved in cvs.