9 Replies Latest reply on Mar 17, 2008 7:04 PM by tak2

    Fork Join and Async Nodes

      Has anybody implements this scene or similar?

      Here the process:

      ...
      <fork name="fork1">
       <transition to="ibk-df-classifier" name="1"></transition>
       <transition to="bayes-wf-classfier" name="2"></transition>
       <transition to="svm-df-classifier" name="3"></transition>
      </fork>
      
      <node name="node1" async="true">
       <action class="DoHeavyJob1"/>
       <transition to="join1"></transition>
      </node>
      
      <node name="node2" async="true">
       <action class="DoHeavyJob2"/>
       <transition to="join1"></transition>
      </node>
      
      <node name="node3" async="true">
       <action class="DoHeavyJob3"/>
       <transition to="join1"></transition>
      </node>
      
      <join name="join1">
       <transition to="finish"></transition>
      </join>
      
      <node name="finish">
       <action class="Result"/>
      </node>
      ...
      


      Description: Intermediate nodes takes long time executing, so I need the async="true" for their CONCURRENT execution, and I cannot execute the "finish" node until ALL intermediate nodes had finished.

      In the DoHeavyJob1, DoHeavyJob2 and DoHeavyJob3 ActionHandlers, I have coded the executionContext.leaveNode(); at the end of the execute method.

      class DoHeavyJob1{
       public void execute( ExecutionContext executionContext ) {
       //Time task Simulation
       Thread.sleep ( 100000 );
      
       executionContext.leaveNode();
       }//execute
      }
      


      These actions are executed fine with JobExecutor, and I see that are executed in a concurrent way. But, the known org.hibernate.StaleObjectStateException occurs when the last ActionHandler makes the leaveNode because of transaction database locks.

      Please, is there any way to implement this issue? Any suggestions?

      Thanks

        • 1. Re: Fork Join and Async Nodes
          kukeltje

          some small things have changed in CVS regarding SOE... can you try running that version?

          • 2. Re: Fork Join and Async Nodes

            Hi Ronald van Kuijk

            I have executed the example with the CVS HEAD and the error is the same. The action handler in the each node, throws the same Exception when executionContext.leaveNode() is executed.

            org.hibernate.exception.LockAcquisitionException: could not update: [org.jbpm.graph.exe.Token#235]
             at org.hibernate.exception.SQLStateConverter.convert(SQLStateConverter.java:82)
             at org.hibernate.exception.JDBCExceptionHelper.convert(JDBCExceptionHelper.java:43)
             at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:2425)
             at org.hibernate.persister.entity.AbstractEntityPersister.updateOrInsert(AbstractEntityPersister.java:2307)
             at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:2607)
             at org.hibernate.action.EntityUpdateAction.execute(EntityUpdateAction.java:92)
             at org.hibernate.engine.ActionQueue.execute(ActionQueue.java:248)
             at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:232)
             at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:140)
             at org.hibernate.event.def.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:298)
             at org.hibernate.event.def.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:27)
             at org.hibernate.impl.SessionImpl.flush(SessionImpl.java:1000)
             at org.jbpm.graph.node.Join.execute(Join.java:116)
             at org.jbpm.graph.def.Node.enter(Node.java:319)
             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
             at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
             at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
             at java.lang.reflect.Method.invoke(Method.java:585)
             at org.hibernate.proxy.pojo.cglib.CGLIBLazyInitializer.invoke(CGLIBLazyInitializer.java:157)
             at org.jbpm.graph.def.Node$$EnhancerByCGLIB$$663a82f3.enter(<generated>)
             at org.jbpm.graph.def.Transition.take(Transition.java:151)
             at org.jbpm.graph.def.Node.leave(Node.java:394)
            
            
            
            189579 [:172.26.0.243:4] INFO DbPersistenceService:258 - optimistic locking failed
            189579 [:172.26.0.243:4] INFO DbPersistenceService:258 - optimistic locking failed
            189672 [:172.26.0.243:4] INFO Services:228 - problem closing service 'persistence': optimistic locking failed
            189672 [:172.26.0.243:4] INFO Services:228 - problem closing service 'persistence': optimistic locking failed
            189672 [:172.26.0.243:4] INFO JobExecutorThread:197 - problem committing job execution transaction: optimistic locking failed
            189672 [:172.26.0.243:4] INFO JobExecutorThread:197 - problem committing job execution transaction: optimistic locking failed
            189688 [:172.26.0.243:3] ERROR AbstractFlushingEventListener:301 - Could not synchronize database state with session
            org.hibernate.StaleObjectStateException: Row was updated or deleted by another transaction (or unsaved-value mapping was incorrect): [org.jbpm.graph.exe.Token#235]
             at org.hibernate.persister.entity.AbstractEntityPersister.check(AbstractEntityPersister.java:1765)
             at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:2407)
             at org.hibernate.persister.entity.AbstractEntityPersister.updateOrInsert(AbstractEntityPersister.java:2307)
             at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:2607)
             at org.hibernate.action.EntityUpdateAction.execute(EntityUpdateAction.java:92)
             at org.hibernate.engine.ActionQueue.execute(ActionQueue.java:248)
             at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:232)
             at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:140)
             at org.hibernate.event.def.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:298)
             at org.hibernate.event.def.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:27)
             at org.hibernate.impl.SessionImpl.flush(SessionImpl.java:1000)
             at org.hibernate.impl.SessionImpl.managedFlush(SessionImpl.java:338)
             at org.hibernate.transaction.JDBCTransaction.commit(JDBCTransaction.java:106)
             at org.jbpm.persistence.db.DbPersistenceService.commit(DbPersistenceService.java:256)
             at org.jbpm.persistence.db.DbPersistenceService.close(DbPersistenceService.java:214)
             at org.jbpm.svc.Services.close(Services.java:224)
             at org.jbpm.JbpmContext.close(JbpmContext.java:139)
             at org.jbpm.job.executor.JobExecutorThread.executeJob(JobExecutorThread.java:193)
             at org.jbpm.job.executor.JobExecutorThread.run(JobExecutorThread.java:64)
            


            Any suggestion? Thanks in advanced!

            • 3. Re: Fork Join and Async Nodes
              tak2

              Our team has been evaluating jBPM as the main flow control platform, and this is literally show stopper.

              Some threads and doc suggested to set isolation level to READ_COMITTED, but I don't think it can solve the issue.

              When parallel nodes enter into Join node, Join.execute must be executed serialized fashion to check all parallel node status to decide to exit the join.

              The issue seems like that Hibernate is treating Token object in optimistic locking fashion even in Join.execute method. But what we need in the Join scenario is pessimistic locking, because join will be executed by multiple thread possibly in different JVM, so database level parent Token record level locking is required.

              Is it possible to use pessimistic locking only in Join.execute()?

              Here is waht I tried with limited Hibernate knowledge, but all didn't work.
              In execute method in Join node, when it call session.lock(parentToken, LockMode.FORCE), hibernate seems like incrementing version (why for locking???), and same operation to the same object in another thread throw StaleObjectStateException.
              This is basic feature we need for concurrent operation, and very easy to implement with SQL based programming with "select for update".

              I tried to LockMode.UPGRADE too, but the result was same.

              So, I tried to called "select for update" against parent Token record instead of using session.lock(), and this operation worked as I expected, but when committing transaction on JbpmContext.close(), later thread still throw StaleObjectStateException.

              OK, then I load object after getting lock (after select for update) by session.load(parentToken.getId(), Token.class, LockMode.UPGRADE), but surprisingly the parentToken object returned still had old version, even DB record was committed and version was incremented.

              Am I doing something wrong?

              • 4. Re: Fork Join and Async Nodes
                kukeltje

                have you used the latest cvs head version? some changes in this area were committed.

                • 5. Re: Fork Join and Async Nodes
                  tak2

                  No, could you please let me know where I can download the source?
                  When I went to https://sourceforge.net/cvs/?group_id=70542, the page said "This project has terned off cvs".

                  thanks,

                  • 6. Re: Fork Join and Async Nodes

                    JBPM Doc:

                    2.3.1. Anonymous CVS access

                    Alternatively, you can get JBoss jBPM from cvs with the following information:

                    * Connection type: pserver
                    * User: anonymous
                    * Host: anoncvs.forge.jboss.com
                    * Port: 2401 (which is the default)
                    * Repository path: /cvsroot/jbpm
                    * Label: :pserver:anonymous@anoncvs.forge.jboss.com:/cvsroot/jbpm

                    Regards

                    • 7. Re: Fork Join and Async Nodes
                      tak2

                      Thanks for the instruction.
                      By the way, what kind of fixes are made after 3.2.2 for this issue?

                      • 8. Re: Fork Join and Async Nodes
                        kukeltje

                        searching the jira for 'staleObjectException jbpm' resulted in http://jira.jboss.com/jira/browse/JBPM-1072

                        • 9. Re: Fork Join and Async Nodes
                          tak2

                          Thanks for the jira info JBPM-1072. I have read through it.
                          First of all, I would like to confirm my understanding is correct. jBPM polls JOB table entry by JobExecutorThread and execute the job by the same JobExecutorThread object. Since each thread has unique name and save it into job record to lock it, the job will be executed only once by one thread. And this jira concluded the cause of SOSE is that each thread has the same name, that's why the same executed more than one thread that causing conflict.

                          But what I am facing is little bit different. The SOSE is happening when it's locking parent token object that is shared by many job entries if they are under the same "fork" node. Since each thread for each job entry of the same parent token will call session.lock() against the same parent token object in optimistic locking fashion, the SOSE will be thrown as java code says.

                          I have downloaded latest version of Join.java, that has sussion.flush() right before session.lock() is called. But it still has the SOSE on the method invocation of the second node's execution under the fork.

                          I have found the work around for this issue with JDBC "select for update" and session.refresh(parentToken) instead of calling session.flush() and session.lock(parentToken).

                          This implementation is very simple, and works perfectly even jobs run on the different JVM under clustered environment.

                          Is it possible to ask jBPM team to review my implementation?

                          Tak