1 2 3 4 5 Previous Next 60 Replies Latest reply on Dec 11, 2005 9:35 AM by chprvkmr Go to original post
      • 45. Re: Simultaneous execution of  Fork children
        sforema

        First, let me say that I am going to create a new fork and join node that works asynch. I was going to just create a new fork node, but after discussing the issue with my coworkers, we are going to add a new join that is aware of asynch tokens. This is the most elegant solution to this problem. It eliminates the need for extra states...

        Elaborating on:
        "parallel1 and parallel2 are designed to go into wait states. I then signal their respective tokens which causes parallel1b and parallel2b to run. each waits 30 seconds before continuing, so I have time to signal the tokens."

        I think when we say "goes into a wait state", it can be confusing. That simply means that I have a state node and I don't cause a signal to occur, leaving the token in that state node.

        parallel1 and parallel2 are state nodes that execute code in the node-enter event to simply Thread.sleep(30000) then fall out, leaving the nodes in a wait state.

        I have a jetty server (http://jetty.mortbay.org/jetty/) that listens for certain URL requests. I have a request to signal a token. I use this URL to signal each token, causing the tokens to run in their own threads. Since parallel1 and parallel2 are signalled, the processes move respectively into parallel1b and parallel2b which Thread.sleep(30000) and then signal themselves to continue.

        Say what you will, but I think jBPM needs asynch classes. We need it for our batch processing. I know that firing off threads in a J2EE environment is a no-no, but jBPM will not always be running in a J2EE environment and asynch behavior is desireable and as this discussion thread shows, it is hard to achieve properly...

        • 46. Re: Simultaneous execution of  Fork children
          aguizar

          You're right about the meaning of locking the process instance. There's a method that wraps that code for you: GraphSession.lockProcessInstance().

          I'm convinced there's a way around this problem, that uses locks and fresh data from the database, and does not require spawning yet another thread to resignal. I need to experiment a bit, tough.

          Could you zip your project files and send the archive to me by e-mail?

          • 47. Re: Simultaneous execution of  Fork children
            sforema

            OK, here is the JoinAsync class. Instead of using a join when you have asychronous logic going on, use a standard node that calls this in its action and asynchronous logic should work.

            Yes, it is a bit of a hack because it essentially reloads the parentToken for analysis and then throws it away, but hey, it gets the job done.

            /**
             *
             */
            package com.whatever.jbpm;
            
            import java.rmi.RemoteException;
            import java.util.Collection;
            import java.util.Iterator;
            
            import org.apache.commons.logging.Log;
            import org.apache.commons.logging.LogFactory;
            import org.dom4j.Element;
            import org.jbpm.graph.action.Script;
            import org.jbpm.graph.def.Node;
            import org.jbpm.graph.exe.ExecutionContext;
            import org.jbpm.graph.exe.Token;
            import org.jbpm.jpdl.xml.JpdlXmlReader;
            import org.jbpm.jpdl.xml.Parsable;
            import org.jbpm.graph.def.ActionHandler;
            import org.jbpm.db.JbpmSession;
            import org.jbpm.db.JbpmSessionFactory;
            
            //public class JoinAsync extends Node implements Parsable {
            
            public class JoinAsync implements ActionHandler {
            
             private static final long serialVersionUID = 1L;
            
             /**
             * specifies if this joinhandler is a discriminator. a descriminator
             * reactivates the parent when the first concurrent token enters the join.
             */
             private boolean isDiscriminator = false;
            
             /**
             * a fixed set of concurrent tokens.
             */
             private Collection tokenNames = null;
            
             /**
             * a script that calculates concurrent tokens at runtime.
             */
             private Script script = null;
            
             /**
             * reactivate the parent if the n-th token arrives in the join.
             */
             private int nOutOfM = -1;
            
             private static final JbpmSessionFactory jbpmSessionFactory = JbpmSessionFactory.buildJbpmSessionFactory();
            
             public void read(Element element, JpdlXmlReader jpdlReader) {
             }
            
             public void execute(ExecutionContext executionContext) {
             Token token = executionContext.getToken();
            
             // if this token is not able to reactivate the parent,
             // we don't need to check anything
             if (token.isAbleToReactivateParent()) {
            
             // the token arrived in the join and can only reactivate
             // the parent once
             token.setAbleToReactivateParent(false);
            
             Node joinNode = token.getNode();
            
             Token parentToken = token.getParent();
             Token originalParentToken = parentToken;
            
             if (parentToken != null) {
            
             /* ========================================================== */
            
             // The parent token must be reloaded from the database for
             // asynchronous behaviour to work properly
             log.debug("need to reload parent token from database");
             JbpmSession jbpmSession = null;
             try {
             jbpmSession = jbpmSessionFactory.openJbpmSession();
            
             // Get the parent token from the database
             log.debug("about to reload parent token from database");
             parentToken = jbpmSession.getGraphSession().loadToken(parentToken.getId());
             log.debug("reloaded parent token from database!");
             // Need to load the children before closing the session
             Iterator iter = parentToken.getChildren().values().iterator();
             while (iter.hasNext()) {
             Token t = (Token) iter.next();
             // Need to set the flag on this token since it isn't committed yet
             if (t.getId() == token.getId()) {
             t.setAbleToReactivateParent(false);
             }
             }
             log.debug("reloaded parent token's children from database!");
            
             } catch (Exception ex) {
             System.out.println("error: " + ex.toString());
             } finally {
             if (jbpmSession != null) {
             jbpmSession.close();
             }
             }
            
             /* ========================================================== */
            
             boolean reactivateParent = true;
            
             // if this is a discriminator
             if (isDiscriminator) {
             // reactivate the parent when the first token arrives in the
             // join. this must be the first token arriving because otherwise
             // the isAbleToReactivateParent() of this token should have been
             // false
             // above.
             reactivateParent = true;
            
             // if a fixed set of tokenNames is specified at design time...
             }
             else if (tokenNames != null) {
             // check reactivation on the basis of those tokenNames
             reactivateParent = mustParentBeReactivated(parentToken, tokenNames.iterator());
            
             // if a script is specified
             }
             else if (script != null) {
            
             // check if the script returns a collection or a boolean
             Object result = script.eval(token);
             // if the result is a collection
             if (result instanceof Collection) {
             // it must be a collection of tokenNames
             Collection runtimeTokenNames = (Collection) result;
             reactivateParent = mustParentBeReactivated(parentToken, runtimeTokenNames.iterator());
            
             // if it's a boolean...
             }
             else if (result instanceof Boolean) {
             // the boolean specifies if the parent needs to be reactivated
             reactivateParent = ((Boolean) result).booleanValue();
             }
            
             // if a nOutOfM is specified
             }
             else if (nOutOfM != -1) {
            
             int n = 0;
             // wheck how many tokens already arrived in the join
             Iterator iter = parentToken.getChildren().values().iterator();
             while (iter.hasNext()) {
             Token concurrentToken = (Token) iter.next();
             if (joinNode == concurrentToken.getNode()) {
             n++;
             }
             }
             if (n < nOutOfM) {
             reactivateParent = false;
             }
            
             // if no configuration is specified..
             }
             else {
             // the default behaviour is to check all concurrent tokens and
             // reactivate
             // the parent if the last token arrives in the join
             reactivateParent = mustParentBeReactivated(parentToken, parentToken.getChildren().keySet()
             .iterator());
             }
            
             /* ========================================================== */
            
             // Reset the parent token back to the one on the open session
             parentToken = originalParentToken;
            
             /* ========================================================== */
            
             // if the parent token needs to be reactivated from this join node
             if (reactivateParent) {
            
             // write to all child tokens that the parent is already reactivated
             Iterator iter = parentToken.getChildren().values().iterator();
             while (iter.hasNext()) {
             ((Token) iter.next()).setAbleToReactivateParent(false);
             }
            
             // write to all child tokens that the parent is already reactivated
             ExecutionContext parentContext = new ExecutionContext(parentToken);
             joinNode.leave(parentContext);
             }
             }
             }
             }
            
             public boolean mustParentBeReactivated(Token parentToken, Iterator childTokenNameIterator) {
             boolean reactivateParent = true;
             while ((childTokenNameIterator.hasNext()) && (reactivateParent)) {
             String concurrentTokenName = (String) childTokenNameIterator.next();
            
             Token concurrentToken = parentToken.getChild(concurrentTokenName);
            
             if (concurrentToken.isAbleToReactivateParent()) {
             log.debug("===================================================");
             log.debug("===================================================");
             log.debug("join will not yet reactivate parent: found concurrent token '" + concurrentToken + "'");
             log.debug("===================================================");
             log.debug("===================================================");
             reactivateParent = false;
             }
             }
             if (reactivateParent) {
             log.debug("===================================================");
             log.debug("===================================================");
             log.debug("all tokens have reached join. allowing passage");
             log.debug("===================================================");
             log.debug("===================================================");
             }
             return reactivateParent;
             }
            
             public Script getScript() {
             return script;
             }
            
             public void setScript(Script script) {
             this.script = script;
             }
            
             public Collection getTokenNames() {
             return tokenNames;
             }
            
             public void setTokenNames(Collection tokenNames) {
             this.tokenNames = tokenNames;
             }
            
             public boolean isDiscriminator() {
             return isDiscriminator;
             }
            
             public void setDiscriminator(boolean isDiscriminator) {
             this.isDiscriminator = isDiscriminator;
             }
            
             public int getNOutOfM() {
             return nOutOfM;
             }
            
             public void setNOutOfM(int nOutOfM) {
             this.nOutOfM = nOutOfM;
             }
            
             private static final Log log = LogFactory.getLog(JoinAsync.class);
            }
            


            • 48. Re: Simultaneous execution of  Fork children
              koen.aers

              Sean,

              I would really advise you to study the behaviour of the jBPM node implementations more deeply. You should not open a new JbpmSession during the execution of a node. Transaction alignment will completely mess up if you do so. Leave the opening and closing of sessions to the client.

              Regards,
              Koen

              • 49. Re: Simultaneous execution of  Fork children
                sforema

                I expected a "spanking" for doing that, but I need updated state info and it did the trick.

                I did isolate it so that the referenced tokens are in their own hibernate session and closed the session afterwords. Nothing is broken in this case. I need the state refreshed in the join and since I am in an object inside the session (join), I can't simply reload the objects (tried that, failed miserably).

                This is equivalent to taking a peak in the database to get an accurate snapshot of what is going on (my states all commit state on a node by node basis so that I can peak around and see what is going on).

                This business of not saving state until the entire process is complete will not work for nodes that are long running. I also needed the ability to monitor the process externally, so the state had to be saved...

                • 50. Re: Simultaneous execution of  Fork children
                  tom.baeyens

                  you shouldn't mess with threads and synchronization in jBPM. use the database for concurrency and optionally use a message system in the same transaction.

                  regards, tom.

                  • 51. Re: Simultaneous execution of  Fork children
                    ralfoeldi

                    @Tom,

                    Sean has a valid point. I woulnd't be able to use his solution - if it is a solution, but that doesn't change the fact the he's trying to solve a real problem.

                    I've linked to my concurrency post 3-4 times in this thread (I wouldn't do it again :-), but my impression is:

                    - lock on ProcessInstance: everythings is fine, but no concurrency
                    - lock on TokenInstance (or not at all) and a Join MIGHT work, but there is no guarantee for that as Sean traced (Due to concurrent changes in the Tokens.)

                    This would mean that jBPM cannot execute Tokens concurrently. (Even with messaging, etc.)

                    @Alex,

                    a J2EE spec conform solution would be very much appreciated. Sean approach wouldn't work in an app server.

                    Greetings

                    Rainer




                    • 52. Re: Simultaneous execution of  Fork children
                      koen.aers

                      Well this is the problem. You cannot have concurrent updates of processinstances, so you will always have to lock them. This is true in normal concurrent programming as well. If one thread updates the process context and doesn't want the other thread to mess up his updates it has to use a monitor that prevents the other thread from updating the process context (the synchronized block in Java). In our case the process context (processinstance) is in the database and we use regular database locking techniques to obtain the isolation (lockProcessInstance). If you do need something else, I would conclude jBPM is not the best solution.

                      Regards,
                      Koen

                      • 53. Re: Simultaneous execution of  Fork children
                        sforema

                        I think there are two uses of jBPM.

                        The traditional use is a long transaction where multiple nodes occur within one transaction. I see the validity of this, but this isn't the only use for jBPM.

                        The second use for jBPM is to save state when each node is complete. Yes, this means the node is doing stuff with the transaction, but this is what I personally wanted from the get go. Without this, you will be UNABLE to monitor jBPM processes externally because the results are in memory until committed to the database.

                        Without this second solution, jBPM will never be able to handle asynchronous processing. I have essentially been hacking jBPM to do this and I am about finished.

                        There are obviously others that want this functionality as well. My coworkers were a bit dumbfounded to discover I had to do this manually. You may say that this makes it flexible. I say it makes it cumbersome.

                        By saying use two is invalid, I think you are turning people away from a valid use of the process engine. To do this, you simply need to add three nodes (at a minimum): StateAsync, ForkAsync, JoinAsync. This does not break jBPM. It just provides a different use where the nodes themselves do work with the jBPMSession.

                        Sean

                        • 54. Re: Simultaneous execution of  Fork children
                          koen.aers

                          Sean,

                          As stated multiple times, you should obtain the async behaviour as it is specified in multiple good books on integration. Use asynchronous messaging, use JMS. There is a simple mechanism of async behaviour included out of the box. This is indeed not sufficient for real concurrent behaviour. But there is absolutely no need to mess with the transactional alignment of the engine execution algorithm : send a message to a queue when entering a state, consume it in an MDB, have the MDB signal the token to move it out from the state when the work is done. The database will synchronize acces and make sure there is no inconsistency. All other solutions are awkward, cumbersome or whatever else you may call them.

                          Regards,
                          Koen

                          • 55. Re: Simultaneous execution of  Fork children
                            sforema

                            jBPM isn't a jBoss solution. It should work outside the context of a J2EE engine. How are you going to provide a BAM solution automatically if it doesn't save state for you...

                            I think we will agree to disagree. I am going to create my ForkAsync and be done...

                            Sean

                            • 56. Re: Simultaneous execution of  Fork children
                              ralfoeldi

                              Koen,

                              I don't believe this is a reason to ditch jBPM :-) It just helps to know the limitations of the tools you're using.

                              It would however be interesting to know if the jBPM Team accepts true concurrency as a problem. I'm not asking for a solution right now, I just want to know your position.

                              JMS and MDBs will not solve all problems connected to this topic. (I can provide a few examples if needed, but there are obvious ones in this thread already.)

                              And if we're strechting jBPM (or maybe Hibernate) to the limit, hey it's positive that we can even discuss this problem. I'm hooking up with a leagcy system that doesn't even support transactions in some cases. No transactions, no problems... except for the occasionally lost document :-)

                              Now we know the limits and will work with what we've got (at least I will).

                              Have a nice "3. Advent" (any politically correct Americans in this forum? Then sorry. Happy "two weeks before winter holiday weekend")

                              Rainer

                              • 57. Re: Simultaneous execution of  Fork children
                                koen.aers

                                First of all, discussion is certainly good. It is the primary force of open source development, so there is indeed no reason to stop discussing and poining out weaknesses of jBPM.
                                But it is very important that everybody knows they should *not* try to code nodes which mess with the transaction alignment of the engine. It is a bad, very bad way to solve problems, which may, and certainly will, cause major problems. Rollbacks will put the database in an incosistent state.
                                Second, I would be happy to see clearly described use cases for which the use of asynchronous (JMS) messages will not give a solution. If you don't use a solution like this, you will end up implementing your own substitute for this.
                                Third, JMS neither MDB are particular to JBoss, you can use them in any appserver you like. If you have another system of doing asynchronous messaging (your own rolled), jBPM will happily use it if there is a Java API. So I still don't see the point in this whole discussion.

                                Regards,
                                Koen

                                • 58. Re: Simultaneous execution of  Fork children
                                  ralfoeldi

                                  Hi Koen,

                                  I agree with you on one and three.

                                  jBPM works, great, solves all problems and there is at the moment no mission critical need for real concurrency.

                                  It is/was important for me to know the limitations because I am using JMS intensively (mainly for transaction reasons) and have to guard against concurrency problems. The conclusion of this Thread for me is: lock on processInstance. Which is fine for me, but effectively means no real concurrent execution.

                                  We can leave it at that.

                                  Everything from now on is purely academic.


                                  (I haven't been to the office for the last week due to a flu and do not (yet) have jBPM or other tools set up at home, so this will look a bit cheap but it should make the point.)

                                  A technical (not business) usecase true concurrency

                                  a) workload in jBPM actions:

                                  thread1.................
                                  tx1.....................
                                   <start/>
                                  
                                   <fork/>
                                  
                                   <state a> <state b>
                                   <action> <action>
                                   <send JMS> <send JMS>
                                   continue in new Thread continue in new Thread
                                   (thread2) (thread3)
                                   </send JMS> </send JMS>
                                   </action> </action>
                                   </state a> <state b>
                                  tx1.....................
                                  thread1.................
                                  
                                  thread2..... thread3.....
                                  tx2......... tx3.........
                                   <MDB> <MDB>
                                   signal() signal()
                                  
                                   <state c> <state d>
                                   <action/> <action/>
                                   </state c> </state d>
                                  
                                   <state e> <state f>
                                   <action/> <action/>
                                   </state e> </state f>
                                  
                                   !!!!!!!!!!
                                  
                                   <join/> <join/>
                                   </MDB> <MDB>
                                  
                                   ??????????
                                  
                                  tx2......... tx3..........
                                  thread2..... thread3......
                                  


                                  The point Sean made was that if tx2 and tx3 happen concurrently, when reaching the Join node both will not recognize that the other has possibly finished and neither will therefore continue.

                                  You can circumvent the problem from above if you delegate all workload to MDBs

                                  b) all workload in MDBs:
                                  thread1.................
                                  tx1.....................
                                   <start/>
                                  
                                   <fork/>
                                  
                                   <state c> <state d>
                                   <action> <action>
                                   <send JMS> <send JMS>
                                   </action> </action>
                                   </state c> </state d>
                                  
                                   <state e> <state f>
                                   <action> <action>
                                   <send JMS> <send JMS>
                                   </action> </action>
                                   </state e> </state f>
                                   <join/>
                                  tx1.....................
                                  thread1.................


                                  MDBa, MDBb, MDBc, MDBd do their stuff totally idependently.

                                  But then the execution or rather failure of e.g. <state f> could not rollback <state d> (not the sending of the JMS message, the failure of the workload.) so this isn't the szenario I would want to use.

                                  Koen, you've mentioned it a few times before yourself. Its a concurrency issue that doesn't go away with messaging. When you think about it there will be no simple solution to it.

                                  And once again: Have a very Merry Third Advent, sing Christmas jingles, go drink Glühwein on a Weihnachtsmarkt and don't go looking for blonde D-Cups.

                                  Rainer

                                  • 59. Re: Simultaneous execution of  Fork children
                                    koen.aers

                                    Hi Rainer,

                                    So indeed, the whole point of tx2 and tx3 happening concurrently is that the processinstance should be locked (either pessimistically or optimistically) to isolate them. This is not a jBPM related issue, it is a very general issue of transaction isolation when you have to access a shared resource (in the jBPM case this resource is the process instance). So the point Sean is making is very true, but the point I am trying to make for user's reading this thread is that these two transactions *should* be isolated to avoid problems. If you break this isolation by starting transactions in custom nodes than you will most certainly not able to roll thing back if something goes wrong.

                                    Which brings me to your second use case. I remember we had this discussion earlier and I clearly see your point. The fact that there is no easy solution for obtaining this, is not at all jBPM related (as we also already concluded). It is again a more general issue for which I think there is no general solution. So you will have to look at your particular case (like you did) and come up with the best solution. One possibility I think, is to have some kind of a compensation mechanism in case something goes wrong in the MDB handling the messages sent in state d or state f. But as I already said, this is why we are called software engineers, there is some thinking to do here ;-)
                                    Anyway even in this second use case the problem of the first use case (a race condition on the processinstance) exists, so you will also want to lock the processinstance.

                                    Now for the serious business... Ronald told you already I am the blonde one with D-Cups, so I won't go looking for them :-P
                                    Besides, I am attending the JavaPolis conference next week, and these blonde D-Cups are hard to find in such an audience ;-))
                                    I think it is a little too early for Christmas wishes, so that will be something for later...

                                    Cheers,
                                    Koen