1 2 Previous Next 25 Replies Latest reply on Dec 8, 2005 12:15 AM by ovidiu.feodorov

    Context of MessageStore design task

    timfox

      I just wanted to summarise this since I know this is a complex discussion that could probably do with being put in context.

      I think this may be useful, especially for people getting up to speed with this subject.

      I have extended this somewhat to include a quick summary of the lazy loading queues idea which is a related subject, even thought the task at hand (for now anyway) is to design and provide a default implementation of the Message Store not implementation of lazy loading queue functionality, but I think this provides a background to how the message store fits into the whole picture.

      (Adrian/others please correct any inaccuracies I have made)

      In order to cope with very large queues/subs (currently all message refs for queues are stored in memory at once), Adrian came up with the idea of lazy loading queues:

      With lazy loading queues/subs, we have "special references" in each queue / sub. When one of these special marker references are read from the front of the queue, they cause the next n message references to be loaded into the queue followed by another marker. Also the messages corresponding to those message references are themselves are loaded into the message store. (In some cases they may be in the store already since a queue/sub can maintain a reference to the same message).

      (The loading could be done in a different thread to smooth out any big pauses for the client??)

      The value of n for a particular queue can be configured by the user to suit the operational characteristics of the queue. (Size of messages/expected throughput etc.)

      If messages are sent to a queue and there are already more than n messages in that queue, they go straight in the persistent store (even in the case of a non-persistent message!) and no reference is added to the queue.

      So this means if the value of n is tuned well for a particular queue, and the messages in that queue have a similar size then we should be able to avoid running out of memory in the message store in most cases. :)

      However, if the message store *does* start running out of memory , which messages should it evict first?

      It's likely a default implementation would want to evict those messages that have been in the store the shortest amount of time since they are towards the back of the queues (MRU). (Needs to be pluggable)

      I guess we should also be able to support eviction of messages in batches (i.e. batch updates in the db for performance). We should also know if a message is already in the db or not without making a db hit at passivation. (E.g. persistent messages will already be there and non-persistent may not) (Again, pluggable)

      When memory usage recovers how do we load messages back into the store? (Again, pluggable)

      Perhaps we don't bother and just wait until the message is actually accessed, this is simple and perhaps should be out default implementation.

      Other implementations could get more complex. We could batch load a subset of the same messages we evicted back into the store in order to utilise free memory well.

      This means we would need to keep track of the ids of the messages we evicted. When loading them back in we may not be able to load them all back in since free memory may not have recovered to the same level it was at before.

      In order to know how many to load we would need to store some kind of cumulative message size in the db. (SELECT * from message_refs where cumulative_size < xyz). This all gets complicated and it's my feeling which should probably just stick to a simple default impl for now.

      As Adrian has pointed out it's key that the message store needs to be designed such these policies are all pluggable, (of course we need to provide a default implementation too).

      Alex is coming up with a design that supports eviction, memory management, etc. pluggably.

      It's possible that treecache could be used to provide part of a default implementation. (To be investigated)

        • 1. Re: Context of MessageStore design task
          ovidiu.feodorov

          This looks reasonably ... complex. How about continuing with a discussion on the interfaces we need to draw boundaries over all this behavior?

          • 2. Re: Context of MessageStore design task
            timfox

            This is just intended as background so people have a good picture of the story without having to dig around in x JIRA tasks and y forum threads.

            Definitely the next stage is to thrash out some interfaces :)

            • 3. Re: Context of MessageStore design task

               

              "timfox" wrote:

              If messages are sent to a queue and there are already more than n messages in that queue, they go straight in the persistent store (even in the case of a non-persistent message!) and no reference is added to the queue.


              You need to be careful to differentiate a normal idle queue (one that is empty except for
              the dummy reference) and an empty idle queue. i.e. it really was empty.

              In this case, the write goes to the store, but it also creates the dummy reference
              in the queue/channel.

              I have a simple prototype of lazily loaded queues somewhere (probably in the UK?)
              where I rewrote JBossMQ's BasicQueue/JMSDestination/PM
              The basic idea was that the PM served as a factory for the destination's internal list.

              At the time, I hadn't connected it with the "dummy" reference idea
              (which comes caching - i.e. caching failures/black lists) so it was
              pretty inefficient at accessing the database for empty queues.

              I discarded it, because it would have broken some of the plugin interfaces
              that JBossMQ defined and *fixed* a long time ago.

              This is why you need to get the interfaces correct upfront.

              i.e. Don't try to make the main interfaces too "fat" to encompass all use cases.
              Not all implementations will be able to cope with all use cases.

              Llt them implement the special use cases using its own internal optimized api,
              e.g. in my JBossMQ prototype it was a "private"
              LazyBasicQueue <-> LazyPersistenceManager protocol.

              Pseudo code because this isn't the way it would be really implemented,
              it doesn't deal with TXs or other issues like message ordering
              LazyBasicQueue$LazyList.add(m)
              {
               pm.write(m);
               if (idle && empty)
               internalAdd(pm.getDummy(this))
               if (idle == false)
               internalAdd(m);
              }
              


              • 4. Re: Context of MessageStore design task
                alexfu.novell

                Here is my thought about MessageStore design:

                (1) I think MessageStore should be the lowest level pluggable component, not Eviction policy/memory management. This is because memory management is tightly bound to certain message store structure, it's hardly to mix different kind of message store and memory manager. E.g. if the MessageStore impl is using TreeCache as the data structure to store all messages, then you cannot switch to other eviction/memory manager at all.

                (2) The interfaces will look like:

                public interface MessageStore
                {
                 Serializable getStoreID();
                
                 boolean isRecoverable();
                
                 boolean acceptReliableMessages();
                
                 MessageReference reference(Routable r);
                
                 MessageReference getReference(Serializable messageID);
                }
                
                public interface MessageReference extends Routable
                {
                 Serializable getStoreID();
                }
                
                public abstract class BaseMessageStore implements MessageStore
                {
                 // Attributes
                 private Serializable storeID;
                 private boolean acceptReliableMessages;
                 private boolean recoverable;
                
                 private PersistManager pm;
                
                 // Public methods
                 public Serializable getStoreID()
                 {
                 return storeID;
                 }
                
                 public boolean isRecoverable()
                 {
                 return recoverable;
                 }
                
                 public boolean acceptReliableMessages()
                 {
                 return acceptReliableMessages;
                 }
                
                 public MessageReference reference(Routable r)
                 {
                 // Check parameter etc.
                
                 MessageReference ref = getReference(r.getMessageID());
                 if (ref == null)
                 ref = createReference((Message)r);
                 return ref;
                 }
                
                 public MessageReference getReference(Serializable messageID)
                 {
                 return getReference(messageID);
                 }
                
                 protected abstract MessageReference createReference(Message m);
                 protected abstract MessageReference getReference(Serializable messageID);
                }
                




                • 5. Re: Context of MessageStore design task

                  You need a "Unit of work" e.g. transaction id.

                  The primary purpose of the message store (ignore all the other features)
                  is to be a recoverable transaction log.

                  It should ensure that adds and removes in the same "unit of work" are done
                  together or not at all. This *must* be done at the lowest level, e.g. using the jdbc
                  local transaction for commit/rollback related changes together.

                  • 6. Re: Context of MessageStore design task
                    alexfu.novell

                    Ok. So the interface of MessageStore will look like:

                    public interface MessageStore
                    {
                     Serializable getStoreID();
                    
                     boolean isRecoverable();
                    
                     boolean acceptReliableMessages();
                    
                     MessageReference reference(Routable r, Transaction tx);
                    
                     MessageReference getReference(Serializable messageID);
                    
                     void commitTx(Transaction tx);
                    
                     void rollbackTx(Transaction tx);
                    }
                    


                    • 7. Re: Context of MessageStore design task
                      alexfu.novell

                      I missed remove():

                      public interface MessageStore
                      {
                       Serializable getStoreID();
                       boolean isRecoverable();
                       boolean acceptReliableMessages();
                      
                       MessageReference reference(Routable r);
                       void reference(Routable r, Transaction tx);
                       MessageReference getReference(Serializable messageID);
                       void remove(MessageReference ref);
                       void remove(MessageReference ref, Transaction tx);
                       void commitTx(Transaction tx);
                       void rollbackTx(Transaction tx);
                      }
                      


                      • 8. Re: Context of MessageStore design task

                        This is the JBossMQ version (I'll comment it):

                        /*
                        * JBoss, Home of Professional Open Source
                        * Copyright 2005, JBoss Inc., and individual contributors as indicated
                        * by the @authors tag. See the copyright.txt in the distribution for a
                        * full listing of individual contributors.
                        *
                        * This is free software; you can redistribute it and/or modify it
                        * under the terms of the GNU Lesser General Public License as
                        * published by the Free Software Foundation; either version 2.1 of
                        * the License, or (at your option) any later version.
                        *
                        * This software is distributed in the hope that it will be useful,
                        * but WITHOUT ANY WARRANTY; without even the implied warranty of
                        * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
                        * Lesser General Public License for more details.
                        *
                        * You should have received a copy of the GNU Lesser General Public
                        * License along with this software; if not, write to the Free
                        * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
                        * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
                        */
                        package org.jboss.mq.pm;
                        
                        import javax.jms.JMSException;
                        
                        import org.jboss.mq.SpyDestination;
                        import org.jboss.mq.server.JMSDestination;
                        import org.jboss.mq.server.MessageCache;
                        import org.jboss.mq.server.MessageReference;
                        
                        /**
                         * This class allows provides the base for user supplied persistence packages.
                         *
                         * @author Hiram Chirino (Cojonudo14@hotmail.com)
                         * @author Paul Kendall (paul.kendall@orion.co.nz)
                         * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
                         * @version $Revision: 1.7.2.3 $
                         */
                        public interface PersistenceManager
                        {
                         // Constants -----------------------------------------------------
                        
                         // Public --------------------------------------------------------
                        
                        // This is deprecated. The message cache is configured on the destination manager */
                        
                         /**
                         * Get the message cache
                         *
                         * @return the instance of the message cache
                         */
                         MessageCache getMessageCacheInstance();
                        
                        // This is probably redundant, and represents bad api design/use, see below */
                        
                         /**
                         * Create and return a unique transaction id.
                         *
                         * @return the transaction
                         * @throws JMSException for any error
                         */
                         Tx createPersistentTx() throws javax.jms.JMSException;
                        
                        // This is probably redundant, and represents bad api design/use, see below */
                         /**
                         * Commit the transaction to the persistent store.
                         *
                         * @param txId Description of Parameter
                         * @throws JMSException for any error
                         */
                         void commitPersistentTx(Tx txId) throws javax.jms.JMSException;
                        
                        // This is probably redundant,, and represents bad api design/use, see below */
                        
                         /**
                         * Rollback the transaction.
                         *
                         * @param txId Description of Parameter
                         * @throws JMSException for any error
                         */
                         void rollbackPersistentTx(Tx txId) throws javax.jms.JMSException;
                        
                        
                        // We let the transaction log decide the transaction manager implementation
                        transaction operations should go through it.
                        In fact, there is only one implementation in JBossMQ of TxManager
                        and all transactions operations do go through the TxManager to Tx then to the
                        persistence manager.
                        So exposing the transaction operations on the PM itself is redudant.
                        These can be private api.
                        
                        
                         /**
                         * Get a transaction manager.
                         *
                         * @return the transaction manager
                         * @throws JMSException for any error
                         */
                         TxManager getTxManager();
                        
                        // There is only one add method, you pass a null txid for no transaction */
                        
                         /**
                         * Add a message to the persistent store. If the message is part of a
                         * transaction, txId is not null.
                         *
                         * @param message the message
                         * @param txId the transaction
                         * @throws JMSException for any error
                         */
                         void add(MessageReference message, Tx txId) throws JMSException;
                        
                        // This should be replaced with something like
                        List getInternalQueueListImplemetation, e.g. it could be a sorted list
                        or it could be a "lazy list" or something else?
                        
                         /**
                         * Restore a queue.
                         *
                         * @param jmsDest the jms destination
                         * @param dest the client destination
                         * @throws JMSException for any error
                         */
                         void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException;
                        
                        // You need this to permenantly mark the persistence store
                        with the "redelivered" information, otherwise it will lose it on a crash
                        
                         /**
                         * Update message in the persistent store. If the message is part of a
                         * transaction, txId is not null (not currently supported).
                         *
                         * @param message
                         * @param txId Description of Parameter
                         * @throws JMSException for any error
                         */
                         void update(MessageReference message, Tx txId) throws JMSException;
                        
                        // See add
                        
                         /**
                         * Remove message from the persistent store. If the message is part of a
                         * transaction, txId is not null.
                         *
                         * @param message the message
                         * @param txId the transaction
                         * @throws JMSException for any error
                         */
                         void remove(MessageReference message, Tx txId) throws JMSException;
                        
                        // Not actually used by JBossMQ. This probably belongs on the
                        List returned by "getInternalQueueListImplemetation" if any extra work is
                        done to free resources. e.g. stop/unregister a monitoring thread that
                        fills lazy queues in the background.
                        
                         /**
                         * Close a queue
                         *
                         * @param jmsDest the jms destination
                         * @param dest the client destination
                         * @throws JMSException for any error
                         */
                         void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException;
                        }
                        


                        • 9. Re: Context of MessageStore design task

                          So I would have:

                          public interface MessageStore
                          {
                           Serializable getStoreID(); ???
                           boolean isRecoverable(); ???
                           boolean acceptReliableMessages(); ???
                          
                           TransactionRepository getTransactionRepository();
                           Subscription restoreSubscription(SubscriptionId id); ???
                          }
                          
                          public interface TransactionRepository
                          {
                           Transaction begin();
                           void rollback(Transaction tx);
                           commit(Transaction tx);
                          }
                          
                          public interface Subscription
                          {
                           void add(MessageReference, Transaction);
                           void update(MessageReference);
                           void remove(MessageReference, Transaction);
                          }
                          


                          With Subscription using "write through" when there is no transaction
                          and synchronization on the tx commit/rollback when there is.

                          It is necessary to split TransactionRepository from Subscription
                          because you have transactions that cross subscriptions.

                          • 10. Re: Context of MessageStore design task

                            You also need to add methods to subscription to support the normal operations
                            of the channel. i.e. things it will delegate to.

                            e.g. (whatever they are called nowadays in JBoss Messaging)

                            addSubscriber(SubscriptionId, long wait);
                            removeSubscriber(SubscriptionId);
                            browse(...)
                            



                            • 11. Re: Context of MessageStore design task

                               

                              "adrian@jboss.org" wrote:

                              public interface TransactionRepository
                              {
                              Transaction begin();
                              void rollback(Transaction tx);
                              commit(Transaction tx);
                              }


                              Actually for future multiple message stores and bridging, etc.
                              It is probably better to make TransactionRepository implement something like
                              XAResource with 2PC prepare()/comit() and the ability "join" rather than
                              just begin()???


                              • 12. Re: Context of MessageStore design task
                                alexfu.novell

                                 


                                public interface TransactionRepository
                                {
                                Transaction begin();
                                void rollback(Transaction tx);
                                commit(Transaction tx);
                                }

                                The caller can get/start transaction themselves and we can be just part of their transaction as you mentioned. So I would think that we don't need TransactionRepository at all. We can move
                                void rollback(Transaction tx);
                                commit(Transaction tx);
                                to Subscription.

                                • 13. Re: Context of MessageStore design task

                                   

                                  "AlexFu.Novell" wrote:

                                  public interface TransactionRepository
                                  {
                                  Transaction begin();
                                  void rollback(Transaction tx);
                                  commit(Transaction tx);
                                  }

                                  The caller can get/start transaction themselves and we can be just part of their transaction as you mentioned. So I would think that we don't need TransactionRepository at all. We can move
                                  void rollback(Transaction tx);
                                  commit(Transaction tx);
                                  to Subscription.


                                  TransactionRepository is the whole point.
                                  It is what represents the transaction log(s).

                                  It can potentially be used in multiple ways:

                                  *Pseudo code*

                                  Local JMS Transaction or 1PC on XAResource (internal txid)
                                  session.commit() - serverside
                                  Transaction tx = repository.begin();
                                  // do the work
                                  tx.commit();
                                  


                                  XA JMS Transaction (external xid)
                                  xaResource.prepare(XID)
                                  Transaction tx = repository.begin(XID);
                                  // do the work
                                  // return our vote
                                  return tx.prepare();
                                  

                                  followed by separate commit request
                                  Transaction tx = repository.get(XID);
                                  tx.commit();
                                  


                                  Mulitple repositories/persistent stores, e.g. message bridging
                                  (JTA)Transaction tx = (JTA)TransactionManager.begin();
                                  tx.enlistResource(fromRepository);
                                  // receive
                                  tx.enlistResource(toRepository);
                                  // add
                                  tx.commit(); --> does 2PC via JTA
                                  


                                  • 14. Re: Context of MessageStore design task

                                     

                                    "AlexFu.Novell" wrote:
                                    We can move
                                    void rollback(Transaction tx);
                                    commit(Transaction tx);
                                    to Subscription.


                                    Like I said before. Even a local jms transaction can include multiple subscriptions.
                                    e.g. receive from one queue and send to another (but the same jms server/message store)
                                    or just send to two different queues.

                                    The classic example is remove from the main queue and send to DLQ
                                    in one atomic operation.

                                    1 2 Previous Next