Looking at the messaging core code I couldn't see any way of telling a Receiver to deliver a bunch of messages as an atomic unit. My knowledge of the messaging core is approaching zero so I could be missing a trick here.
The receive is not a part of the transaction state. You receive messages
before you do commit/rollback.
It is the message acknowledgements that need to be prepared/committed.
ack prepare -> logically remove messages
ack commit -> physically remove messages
ack rollback -> reinstate logically remove messages
Of course the 1PC optimization bypasses the prepare.
So my assumption is that it's up to the JMS facade to hold onto messages until commit for a transacted session. I guess the same applies for acknowledgements.
Correct, though the unacknowledged messages need be "rolled back"
in the event of session.close without a commit/rollback or a client crash.
This should be handled gracefully by the client, but the server needs
to be able to step in to recover the unreliable client leaving things in a mess.
It also needs to hold uncommitted sent messages.
On this assumption (which may well be wrong) I'm trying to work out where we hold the set of messages for the transaction, so we can send them on the commit.
In a map inside the XAResource. txid -> transaction state (sends/acks)
txid could be the XID for JTA, otherwise it will be an internal transaction id
Advantage of 2) seems to be minimising network/inter JVM calls in the case the client and the server peer aren't in the same JVM.
The interceptor could be located on either the server or the client, depending
upon how thick/thin you want the clients.
Also, how do we deal with running out of memory if a lot of messages are sent in a transacted session?
This is an advanced feature that requires monitoring memory and moving
messages out of memory when required (replaced with a handle to the
cache store location).
See the JBossMQ implementation if you want details. This is one of the
things that is actually well designed in that implementation.
Hi Adrian - thanks for your reply.
I've managed to knock together a (very) rudimentary and incomplete implementation of transacted sessions which appears to work ok.
So far I've only transacted message sends, and haven't done the acks.
Basically I've implemented a very simple ResourceManager which just handles local tx for now demarcated by commit() and rollback() on the Session object. This could be extended to handle the XAResource stuff later.
If the session is transacted then I'm storing the message in a LocalTx object which is stored in a map in the resource manager keyed on Xid.
The resource manager is on the client side.
On commit of the session. A "transactionrequest" object containing the messages to send are sent to the server delegate, which then iterates through them and sends them to the messaging core class to deliver.
So far the logic that adds the message to the resource manager when the session is transacted is done in the JBossMessageProducer class which would normally delegate to the MessageProducerDelegate class causing the invocation of the client side interceptor stack to be executed.
I think I need to move this to a client side interceptor (TransactionInterceptor) but just wanted to clarify with you the advantages of implementing it in an interceptor as opposed to the just using the JBossMessageProducer class itself.
So far I can see a few advantages of implementing this as an interceptor:
It allows the interceptor to moved to the server side if a "thin client" approach is required, as opposed to the "fat client" I have done so far.
It allows other interceptors e.g. LogInterceptor or ExceptionInterceptor to be placed before it in the interceptor chain. Although I guess the same effect could be achieved by using JbossAOP to advise before calls to JBossMessageProducer methods.
Are there other reasons for implementing this as an interceptor that I'm missing?
The classes like JBossMessageProducer are only meant to map the *FAT* JMS
api to a simpler sub-version.
You should not be putting real logic (and definitly not state)
into that code otherwise it cannot be overridden/changed/optimized
by different interceptor configuration.
e.g.1. I want to store my transacted session on the server because I am a small
handheld device. So I move most of the interceptors to the serverside.
e.g.2. I want to do a work offline mode such that when I commit/send it actually stores the messages
in a local store and a background thread does the real send to the server when
I am connected.
e.g.3. Some other use case I haven't thought about yet.
Thanks Adrian, that's a lot clearer now.
I've now moved the transaction handling logic into the interceptor stack.
So far, I've put it into TransactionInterceptor.
TransactionInterceptor does the following:
It intercepts ConnectionDelegate.createSessionDelegate() and creates a new local tx (using the local transaction manager) if the session is transacted. The Xid of this is then stored in the meta-data accessible to the SessionDelegate.
It intercepts ProducerDelegate.send() and adds the message to a list of messages for the tx if the session is transacted.
It intercepts SessionDelegate.commit() and sends the list of messages to the ServerSessionDelegate for passing to the messaging core for delivery.
It also intercepts SessionDelegate.rollback() and SessionDelegate.close();
I noticed that there's also a SessionInterceptor in the design documents, but wasn't entirely sure what kind of stuff it has responsibility for.
Perhaps some of the transacted session stuff needs to go in the SessionInterceptor? I'm not 100% sure of the split of responsibility between these interceptors. I could probably do with a steer here.