4 Replies Latest reply on Jul 24, 2006 5:50 AM by Matthias Gommeringer

    CommandExecutor in aclustered environment

    Rolf Schuster Newbie

      Hi,

      I want to run jBPM in a 2-way cluster, but I saw that the CommandExecutor loads the messages without locking and processes them.

      The problem is, when there is one CommandExecutor on each machine, both could load and process the same message, isn't it?

      The query named "MessagingSession.findMessages" for selecting the messages is in the hibernate.queries.hbm.xml file, but it loads all messages at once.

      How can I modify jBPM to do a row lock on only the message currently being processed? It is critical that no message is processed by both CommandExecutors.

      Would it be enough to place the line in bold into the executeCommand() method of the CommandExecutorThread class:

      ...
      message = dbMessageService.receiveNoWait(destination);
      jbpmContext.getSession().lock(message, LockMode.UPGRADE);

      ?

      The second problem would be how to make the CommandExecutor to skip the locked messages.

      Thanx in advance

        • 1. Re: CommandExecutor in aclustered environment
          Tom Baeyens Master

          you could try to improve the current message service that way.

          but using a JMS implementation (work is on the way there) is probably the easier and more robust solution for your needs.

          • 2. Re: CommandExecutor in aclustered environment
            Matthias Gommeringer Newbie

            Hi,

            I would also like to use the CommandExecutor in an cluster (starting it multiple times on different machines). It sounds as if the DBMessageService will not be cluster-ready in the future. Am i right?

            If I am not fully wrong it is no big effort to make the DBMessageService cluster-ready.
            I looked at the sources and think that the right place (at least in the current CVS) to ensure that a single message is only processed by one CommandExecutor is org.jbpm.db.MessagingSession. The method "nextMessage(String destination)" should return only Message objects that are not locked by another transaction.

            Here my -untested- proposal:

             public Message nextMessage(String destination) {
             Message message = null;
             if (nextMessage!=null) {
             message = nextMessage;
             nextMessage = null;
             } else {
             Iterator messageIterator = getMessageIterator(destination);
             if (messageIterator.hasNext()) {
             // Get the next message that was not locked on DB and lock it
             message = getNextUnlockedMessage(messageIterator);
             }
             if (messageIterator.hasNext()) {
             // Get the next message that was not locked on DB and lock it
             nextMessage = getNextUnlockedMessage(messageIterator);
             }
             }
             return message;
             }
            
             private Message getNextUnlockedMessage(Iterator messageIterator) {
             Message message = (Message) messageIterator.next();
             try {
             // try to lock the message
             session.lock(message, LockMode.WRITE);
             // Successfully locked
             return message;
             }
             catch(HibernateException e) {
             if(e.getCause() instanceof LockAcquisitionException) {
             // Failed to acquire the lock - try to get next
             return getNextUnlockedMessage(messageIterator);
             }
             else {
             throw e;
             }
             }
            }
            


            Is this code okay for this purpose?
            I only recognized that several JBPM classes (also the PersistenceService which creates the MessagingSession) must be modified/overridden in order to plug in a custom implementation here and I would dislike to do this within my project.

            Thanks,
            Matthias

            • 3. Re: CommandExecutor in a clustered environment
              Rolf Schuster Newbie

              I did the locking in the following way:

              if (message != null) {
               jbpmContext.getSession().lock(message, LockMode.UPGRADE);
               checkForMoreMessages = true;
               Command command = (Command) message;
               command.execute();
               jbpmContext.save(message.getToken().getProcessInstance());
               jbpmContext.getSession().delete(message);
               }
               session.getTransaction().commit();
              


              I implemented MyDbMessageService, because DbMessageService deletes the message from the session before it returns it (and before I can lock it in the CommandExecutor).
              The message is now deleted in the CommandExecutor after locking it.

              It is not possible to simply extend jBPM classes if they are not in the same package.
              This is because many jBPM classes don't use 'protected' fields, but ones with package-local access.

              For now I didn't try clustering with the CommandExecutor, I did only the locking part. Also I can't use JMS.


              • 4. Re: CommandExecutor in a clustered environment
                Matthias Gommeringer Newbie

                Thanks for the reply, t-dome. Your solution has the advantage, that i only need to re-implement three classes: DBMessageService, DBMessageServiceFactory (for jbpm.cfg) and CommandExecutor. I my eyes, the only thing that is missing for clustering is the exception handling for the line

                jbpmContext.getSession().lock(message, LockMode.UPGRADE);
                


                When a LockAcquisitionException is thrown here, we must try to get the next message and skip the one we could not lock.

                CommandExecutor.executeCommand():
                ...
                message = getAndLockNextMessage(dbMessageService, destination, jbpmContext.getSession());
                // message = dbMessageService.receiveNoWait(destination);
                
                // If we got a message here, we own the lock
                if (message != null) {
                 checkForMoreMessages = true;
                 Command command = (Command) message;
                 log.trace("executing command '" + command + "'");
                 command.execute();
                 // Because our DBMessageService does not delete the
                 // message, we must delete it here
                 jbpmContext.getSession().delete(message);
                }
                ...
                


                private Message getAndLockNextMessage(DbMessageService dbMessageService, String destination, Session session) {
                 Message message = dbMessageService.receiveNoWait(destination);
                 if (message != null) {
                 try {
                 // try to lock the message
                 session.lock(message, LockMode.WRITE);
                 // Successfully locked
                 return message;
                 } catch (HibernateException e) {
                 if (e.getCause() instanceof LockAcquisitionException) {
                 // Failed to acquire the lock - try to get next
                 return getAndLockNextMessage(dbMessageService, destination, session);
                 } else {
                 throw e;
                 }
                 }
                 } else {
                 return null;
                 }
                }
                


                From the OO-Point-of-view, this is not the best solution (for me). I think that the MessageService itself, regardless of which implementation is used, should ensure, that the method "nextMessage()" or "receiveNoWait()" only returns Message objects, that have successfully been locked.