-
1. Re: CommandExecutor in aclustered environment
tom.baeyens Apr 3, 2006 9:49 AM (in response to tx_dome)you could try to improve the current message service that way.
but using a JMS implementation (work is on the way there) is probably the easier and more robust solution for your needs. -
2. Re: CommandExecutor in aclustered environment
mgommeringer Jul 24, 2006 3:14 AM (in response to tx_dome)Hi,
I would also like to use the CommandExecutor in an cluster (starting it multiple times on different machines). It sounds as if the DBMessageService will not be cluster-ready in the future. Am i right?
If I am not fully wrong it is no big effort to make the DBMessageService cluster-ready.
I looked at the sources and think that the right place (at least in the current CVS) to ensure that a single message is only processed by one CommandExecutor is org.jbpm.db.MessagingSession. The method "nextMessage(String destination)" should return only Message objects that are not locked by another transaction.
Here my -untested- proposal:public Message nextMessage(String destination) { Message message = null; if (nextMessage!=null) { message = nextMessage; nextMessage = null; } else { Iterator messageIterator = getMessageIterator(destination); if (messageIterator.hasNext()) { // Get the next message that was not locked on DB and lock it message = getNextUnlockedMessage(messageIterator); } if (messageIterator.hasNext()) { // Get the next message that was not locked on DB and lock it nextMessage = getNextUnlockedMessage(messageIterator); } } return message; } private Message getNextUnlockedMessage(Iterator messageIterator) { Message message = (Message) messageIterator.next(); try { // try to lock the message session.lock(message, LockMode.WRITE); // Successfully locked return message; } catch(HibernateException e) { if(e.getCause() instanceof LockAcquisitionException) { // Failed to acquire the lock - try to get next return getNextUnlockedMessage(messageIterator); } else { throw e; } } }
Is this code okay for this purpose?
I only recognized that several JBPM classes (also the PersistenceService which creates the MessagingSession) must be modified/overridden in order to plug in a custom implementation here and I would dislike to do this within my project.
Thanks,
Matthias -
3. Re: CommandExecutor in a clustered environment
tx_dome Jul 24, 2006 4:44 AM (in response to tx_dome)I did the locking in the following way:
if (message != null) { jbpmContext.getSession().lock(message, LockMode.UPGRADE); checkForMoreMessages = true; Command command = (Command) message; command.execute(); jbpmContext.save(message.getToken().getProcessInstance()); jbpmContext.getSession().delete(message); } session.getTransaction().commit();
I implemented MyDbMessageService, because DbMessageService deletes the message from the session before it returns it (and before I can lock it in the CommandExecutor).
The message is now deleted in the CommandExecutor after locking it.
It is not possible to simply extend jBPM classes if they are not in the same package.
This is because many jBPM classes don't use 'protected' fields, but ones with package-local access.
For now I didn't try clustering with the CommandExecutor, I did only the locking part. Also I can't use JMS. -
4. Re: CommandExecutor in a clustered environment
mgommeringer Jul 24, 2006 5:50 AM (in response to tx_dome)Thanks for the reply, t-dome. Your solution has the advantage, that i only need to re-implement three classes: DBMessageService, DBMessageServiceFactory (for jbpm.cfg) and CommandExecutor. I my eyes, the only thing that is missing for clustering is the exception handling for the line
jbpmContext.getSession().lock(message, LockMode.UPGRADE);
When a LockAcquisitionException is thrown here, we must try to get the next message and skip the one we could not lock.
CommandExecutor.executeCommand():... message = getAndLockNextMessage(dbMessageService, destination, jbpmContext.getSession()); // message = dbMessageService.receiveNoWait(destination); // If we got a message here, we own the lock if (message != null) { checkForMoreMessages = true; Command command = (Command) message; log.trace("executing command '" + command + "'"); command.execute(); // Because our DBMessageService does not delete the // message, we must delete it here jbpmContext.getSession().delete(message); } ...
private Message getAndLockNextMessage(DbMessageService dbMessageService, String destination, Session session) { Message message = dbMessageService.receiveNoWait(destination); if (message != null) { try { // try to lock the message session.lock(message, LockMode.WRITE); // Successfully locked return message; } catch (HibernateException e) { if (e.getCause() instanceof LockAcquisitionException) { // Failed to acquire the lock - try to get next return getAndLockNextMessage(dbMessageService, destination, session); } else { throw e; } } } else { return null; } }
From the OO-Point-of-view, this is not the best solution (for me). I think that the MessageService itself, regardless of which implementation is used, should ensure, that the method "nextMessage()" or "receiveNoWait()" only returns Message objects, that have successfully been locked.