4 Replies Latest reply on May 27, 2005 12:41 AM by ovidiu.feodorov

    Transacting message sends in core.

    timfox

      Currently, when a JMS Transacted session is committed, any messages sent in the tx are sent to the client's server peer where the handle() method is called on the Destination (Queue or Topic) to which the message was intended.

      Handle() is called once for each message in the tx that needs to be sent.

      If failure of the server peer occurs before all the messages are handled() and positively acknowledged then it may be the case that only some of the messages in the tx end up being sent.

      It seems to me that this breaks the atomicity of the transaction.

      One solution for this would be to make sure that all the messages for the tx are persisted in a reliable store *before* delivery is attempted for any of them.

      Currently, synchronous delivery is attempted before persisting the messages.

      We could introduce a new method boolean handle(Routable[] routables) which makes sure that *all* or *none* of the messages are persisted (and thus positively acknowledged).

      I'm not sure whether this should go on Receiver, or Channel (??)

      Under the bonnet (hood) this could be implemented by making message store and acknowledgement store transactional, which is required anyway IMHO for the analogous problem of transactional acknowledgements.


      -Tim

        • 1. Re: Transacting message sends in core.

          I've said it before, look at the JBossMQ implementation. This is how it should be done.
          It is call write ahead logging.

          Step 1 - Log the changes that are to be made (where messages are reliable)
          Step 2a - If everything ok, make the changes permenant
          Step 2b - otherwise, rollback any logged changes that were made

          For XA, steps 1 and 2 could be different server invocations (prepare/commit/rollback)
          under 2PC.

          The write ahead logging/persisting is a requirement (for JMS reliable messages)
          even if you don't have a transaction.
          When the send() method returns it *must* be guaranteed that the message is
          eventually delivered (even if that requires booting a different server from the log
          e.g. because the original server blew up :-).

          • 2. Re: Transacting message sends in core.

            And yes, acknowledgements must be persisted/logged as well.
            e.g. JBossMQ's JDBC2 does this by logically deleting the message
            TXOP="D" during the prepare and physically deleting it at commit.

            • 3. Re: Transacting message sends in core.
              ovidiu.feodorov

              I am trying to consolidate discussions held on various threads about how to implement transactional support. For simplicity, I would start with two simple use cases, and then work my way up from here:

              1. Transactionally sending multiple NON_PERSISTENT messages
              2. Transactionally receiving multiple NON_PERSISTENT messages

              1. Transactionally sending multiple NON_PERSISTENT messages

              In this case we only care that our messages are atomically delivered to the destination. The discussion does not include the processing of the ACKs generated by final consumers. The consumer-generated ACKs are handled the usual way.

              Once a session has been declared "transactional", there is always an active transaction associated with it. All messages sent by session's Producers are automatically associated to this transaction and they keep accumulating until the transaction is committed. Our current interceptor architecture allows us to keep these non-committed messages on the client or on the server, depending on where we place the interceptor.

              An advantages of keeping non-committed messages on client is that we minimize network traffic, as Tim pointed out earlier.

              The disadvantage is that we are in danger of running out of memory. Most likely we will have no control over client's memory and no access to storage to spill messages to. For a long running transaction/big messages it is possible to run out of memory. This is not a problem on the server side, where supposedly we have access to external reliable storage to spill message to, if we need it.

              It is also possible to combine these two approaches by transferring non-committed messages from client to server once the amount of memory they use go over a certain threshold.We should probably worry about this in the future, but not now. I would suggest to start with the simplest approach: non-committed messages accumulate on client. This is what JBossMQ does too. The interceptor should be written in such a way that it does not depend on anything client-specific, so we can later move it to the server if necessary.

              So, functionality we need so far is:
              - generate TxIDs and associate them with sessions; these TxID could be JTA XIDs or something that we generate if no JTA transaction is available. Parts of this is already available.
              - maintain a <TxID - unacked messages list> map. Parts of this is already available, as far as I know.
              - on transacted send() stop the invocation propagation and add the message to the non-committed message list, instead of forwarding it to the server.
              - on commit() send non-committed messages in bulk to the server
              - on rollback() drop non-committed messages

              Once the non-committed messages arrive in bulk on the server, we need to make sure that either all of them are accepted by the destination, or none of them is.

              JBossMQ handle this as follows: JMSDestinationManager receives the array containing non-committed messages and adds them to the destination (BasicQueue) one by one, within the scope of a transaction. It uses a org.jboss.mq.pm.Tx for that. The destination first caches messages and only deals with message references. If the message is PERSISTENT, the queue transactionally persists (TXID) in JMS_TRANSACTIONS and (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, "A") in JMS_MESSAGES. This doesn't happen if the message is NON_PERSISTENT. Finally, the queue registers AddMessagePostCommitTask/AddMessagePostRollbackTasks with the transaction. No message is delivered to the destination at this stage yet. On commit, the persistence manager deletes from the database the messages corresponding to the current transaction that are marked "D" and the transaction itself and physically sends the messages to the destination, one by one, by running previously-registered AddMessagePostRollbackTasks in a loop.

              An interesting question at this point would be what happens if one of the AddMessagePostCommitTasks fails. For NON_PERSISTENT messages, some of the messages will be delivered to clients, and some won't. This situation is within the provisions of the spec, because NON_PERSISTENT messages may be lost.

              To summarize, BasicQueue is not transactional, but JMSDestinationManager enforces a transactional behavior when uses it.

              In our case, we need to achieve a similar behavior when sending the bulk non-committed messages to the destination Receiver. I see several possibilities:

              1. Insert a transactional layer between SessionServerDelegate the target Receiver. For NON_PERSISTENT messages, nothing goes to the database, the wrapper only register something similar to AddMessagePostCommitTask/AddMessagePostRollbackTasks for each message with the local transaction manager. This is the logging Adrian mentioned. On commit, messages will delivered one by one in a loop to Receiver.handle(). Anything else than a positive acknowledgment from the destination rolls back the transaction.

              2. Subclass Receiver to provide this functionality. Write a new TransactionalReceiver type that works with a transaction manager (and a MessageStore, for persistent messages) to provide transactionality in a way that was described above. If we do this with a XAResource we get 2PC wiring in there from the beginning.

              3. Add a new method boolean handle(Set) to Receiver. However, the implementation of the method must provide in a way or another the behavior that I've described above.


              2. Transactionally receiving multiple NON_PERSISTENT messages

              In this case, we only care about acknowledgments returning from the consuming Session. The Session either acknowledges all received messages, or none.

              Positive ACKs keep accumulating until transaction commits. Same discussion as where to accumulate them applies here as well. For the time being, let's keep them on the client side. The interceptor that manages them should do it in a way that that does not depend on anything client-specific.

              Additional functionality that we need is:
              - maintain a map <TxID - acknowledgments>
              - on transacted receive() add the acknowledgment to the acknowledgment list, instead of sending it to the server
              - on commit() send acknowledgments in bulk to the server
              - on rollback() drop the acknowledgments; the server will eventually redeliver the corresponding messages.

              When the ACK set reaches the server, we have a similar situation: we need to make sure that either all of them are accepted by the Channel, or none of them is. Whatever solution is chosen for messages could be also applied for acknowledgments.

              • 4. Re: Transacting message sends in core.
                ovidiu.feodorov

                I have checked in code that adds transactional handling for messages at Channel level. The support is incomplete, it only transacts NON_PERSISTENT messages on a LocalQueue. Support for all channel types, acknowledgments and PERSISTENT messages will follow soon.

                To get an idea how it works, look at TransactionalChannelSupportTest.