14 Replies Latest reply on Feb 15, 2005 12:12 PM by adrian.brock

    Message ordering

    greydeath

      I have the sitation that a SessionBean sends 3 messages to a queue in one transaction.

      15:24:49,157 INFO [JMS SessionPool Worker-6][JMSDestinationProxy] Proxy Message: 6 Correl: 2
      15:24:49,160 INFO [JMS SessionPool Worker-6][JMSDestinationProxy] Proxy Message: 6 Correl: 3
      15:24:49,162 INFO [JMS SessionPool Worker-6][JMSDestinationProxy] Proxy Message: 6 Correl: 4

      But when I look at the timestamps something very strange happens:

      Timestamp CorrelationId
      1107527163426 4
      1107527164357 2
      1107527164361 3



      The message with Correl-Id 4 is sent at last, never the less it has a newer timestamp than the others. Pls remember it is the same transaction.

      Furthermore (and much more important) the Correl-Id 4 is delivered before the others and I assume the timestamp is the reason for that.

      I'm using 4.0.1 with the standard jms configuration.

      Help would be very much appreciated

      Mike

        • 1. Re: Message ordering

          I don't believe your assertions. Use "READ THIS FIRST" to show me what is really
          happening.

          The code is too trivial to be at fault:

          from org.jboss.mq.MessageProducer#send(...)

           long ts = System.currentTimeMillis();
           sendMessage.setJMSTimestamp(ts);
          


          • 2. Re: Message ordering
            greydeath

            Adrian,

            I have several problems and I do not know how to sovle them.

            1. Message Order:

            I don't how it happens but I proved now that the messages are sometimes not delivered in the same order as sent to the queue.

            First transaction sender:
            09:18:27,343 INFO [JMS SessionPool Worker-13][JMSDestinationProxy] Proxy Message: 7 Correl: 2
            09:18:27,345 INFO [JMS SessionPool Worker-13][JMSDestinationProxy] Proxy Message: 7 Correl: 3

            I turned on the trace for mq. I checked that the transaction for these messages is definitly commited before the following:

            09:18:29,043 INFO [JMS SessionPool Worker-6][JMSDestinationProxy] Proxy Message: 7 Correl: 4

            And as you see it is also 2 seconds later!!

            Nevertheless my client receives this last message first.

            As for the timestamp, I cannot verify my original assumption anymore, maybe I made a mistake there.

            Anyway, I can reproduce the problem I just wrote you any time.

            My Application does 'KeyStreaming'. There are MDB's which consume messages and sent them to another queue again. Between themselfs they make sure that a certain order is retained on the 'OUT' queue.


            These are the timestamps and the correlation id's on the out queue (the c-id is the order that should be)
            11077643073431 2
            11077643073452 3
            11077643090434 4
            11077643106761 5
            11077643106802 6
            11077643106853 7
            11077643106904 8
            11077643106955 9

            Everything ok here.But I defenitly received 4 at first. From the client-id you can also see the transaction boundaries, as every transaction is a new client in my example

            [org.jboss.mq.SpyMessage {
            Header {
             jmsDestination : QUEUE.testKeystreamOut
             jmsDeliveryMode : 2
             jmsExpiration : 0
             jmsPriority : 4
             jmsMessageID : ID:23-11077643090434
             jmsTimeStamp : 1107764309043
             jmsCorrelationID: 4
             jmsReplyTo : null
             jmsType : null
             jmsRedelivered : true
             jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=4, JMS_JBOSS_REDELIVERY_COUNT=1}
             jmsPropReadWrite: false
             msgReadOnly : true
             producerClientId: ID:23
            }
            }, org.jboss.mq.SpyMessage {
            Header {
             jmsDestination : QUEUE.testKeystreamOut
             jmsDeliveryMode : 2
             jmsExpiration : 0
             jmsPriority : 4
             jmsMessageID : ID:13-11077643106761
             jmsTimeStamp : 1107764310676
             jmsCorrelationID: 5
             jmsReplyTo : null
             jmsType : null
             jmsRedelivered : false
             jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=6}
             jmsPropReadWrite: false
             msgReadOnly : true
             producerClientId: ID:13
            }
            }, org.jboss.mq.SpyMessage {
            Header {
             jmsDestination : QUEUE.testKeystreamOut
             jmsDeliveryMode : 2
             jmsExpiration : 0
             jmsPriority : 4
             jmsMessageID : ID:13-11077643106802
             jmsTimeStamp : 1107764310680
             jmsCorrelationID: 6
             jmsReplyTo : null
             jmsType : null
             jmsRedelivered : false
             jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=6}
             jmsPropReadWrite: false
             msgReadOnly : true
             producerClientId: ID:13
            }
            }, org.jboss.mq.SpyMessage {
            Header {
             jmsDestination : QUEUE.testKeystreamOut
             jmsDeliveryMode : 2
             jmsExpiration : 0
             jmsPriority : 4
             jmsMessageID : ID:13-11077643106853
             jmsTimeStamp : 1107764310685
             jmsCorrelationID: 7
             jmsReplyTo : null
             jmsType : null
             jmsRedelivered : false
             jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=7}
             jmsPropReadWrite: false
             msgReadOnly : true
             producerClientId: ID:13
            }
            }, org.jboss.mq.SpyMessage {
            Header {
             jmsDestination : QUEUE.testKeystreamOut
             jmsDeliveryMode : 2
             jmsExpiration : 0
             jmsPriority : 4
             jmsMessageID : ID:13-11077643106904
             jmsTimeStamp : 1107764310690
             jmsCorrelationID: 8
             jmsReplyTo : null
             jmsType : null
             jmsRedelivered : false
             jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=8}
             jmsPropReadWrite: false
             msgReadOnly : true
             producerClientId: ID:13
            }
            }, org.jboss.mq.SpyMessage {
            Header {
             jmsDestination : QUEUE.testKeystreamOut
             jmsDeliveryMode : 2
             jmsExpiration : 0
             jmsPriority : 4
             jmsMessageID : ID:13-11077643106955
             jmsTimeStamp : 1107764310695
             jmsCorrelationID: 9
             jmsReplyTo : null
             jmsType : null
             jmsRedelivered : false
             jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=9}
             jmsPropReadWrite: false
             msgReadOnly : true
             producerClientId: ID:13
            }
            }, org.jboss.mq.SpyMessage {
            Header {
             jmsDestination : QUEUE.testKeystreamOut
             jmsDeliveryMode : 2
             jmsExpiration : 0
             jmsPriority : 4
             jmsMessageID : ID:15-11077643073431
             jmsTimeStamp : 1107764307343
             jmsCorrelationID: 2
             jmsReplyTo : null
             jmsType : null
             jmsRedelivered : false
             jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=1}
             jmsPropReadWrite: false
             msgReadOnly : true
             producerClientId: ID:15
            }
            }, org.jboss.mq.SpyMessage {
            Header {
             jmsDestination : QUEUE.testKeystreamOut
             jmsDeliveryMode : 2
             jmsExpiration : 0
             jmsPriority : 4
             jmsMessageID : ID:15-11077643073452
             jmsTimeStamp : 1107764307345
             jmsCorrelationID: 3
             jmsReplyTo : null
             jmsType : null
             jmsRedelivered : false
             jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=2}
             jmsPropReadWrite: false
             msgReadOnly : true
             producerClientId: ID:15
            }
            }]


            • 3. Re: Message ordering
              greydeath

              Maybe I should describe what the MDB does.

              My messages have keys (messageKey). The Sender will put a 'notification message' on the queue for each new key in one transaction. These messages only have a property messageKey.
              The MDB is listening on these messages (message selector). Upon onMessage it locks the key to ensure that no other MDB is working on that key. After that it sets up a queue receiver to receive the 'data messages' for that key (queue_keystream_data is not null and messageKey=7) and processes them. (In my simple scenario it just sends the message to another queue) Ever message is its own transaction here.

              If no more are to be received, it closes the consumer and unlocks the key.

              After that it opens a QueueBrowser with the same message selector to check whether any new messages arrived in the mean time.

              Here is my second problem. I sometimes loose messages (forget them on the input side) and it seems that happens because the queue browser somtimes does not have elements although there are some on the queue.

              This whole thing is to ensure that messages with the same key are processed in the right order.

              Refering to me last post; therefore I know the messages of messageKey 7 go to the OUT queue in the right order, but the retrieving client does not get it that way.

              • 4. Re: Message ordering

                This is an FAQ. In fact multiple FAQs.

                1) Read the spec about no guarantee of ordering of an MDB
                2) Read the spec about EJBs should not do internal locking (it certainly doesn't work in a cluster)
                3) Read the JBossMQ FAQ about MDB singleton (which is probably what you are looking for?)
                4) Read the spec about QueueBrowser is NOT a transactional object and only sees what has not been sent to a client, i.e. it is a snapshot
                5) Read my many posts about how the JBoss MDB works (in particular the read ahead processing AND the difference between the jms session pool and ejb instance pool)
                etc.

                I moved another guys post to the "Useless rants" forum because he was asking
                virtually the same question that has been answered Sooo many times before and refused
                to do his own research.

                • 5. Re: Message ordering
                  greydeath

                  Adrian,

                  1) I know that, that is the reason for how we do 'keystreaming'
                  2) Maybe it shouldn't but for this purpose I need to. And I use a cluster locking mechanism
                  3) That is not what I want
                  4) exactly I want a snapshot!!
                  5) In effect the Queue the MDB listens on has 2 sets of messages, distinguishably by a message property. The MDB only listens on one of them. The other set is consumed inside the MDB via a normal queue receiver.

                  So the MDB should not 'read ahead' the messages that are not selected by its message selector right?


                  Please do not just ignore this as a useless rant!! It isn't. And I do not ask for design either, so please don't just pull the read the FAQ card.

                  I have two distinct problems.

                  1) I know that Messages are on the queue but the QueueBrowser does not show them. I know that the browser is not transactional and that is ok. All I want is to check if messages of a certain type are in the queue!!

                  2) Different MDB's are sending Messages to a Queue. My code garanties that messages with the same key are on the output queue in the same order as they came in!!. The log and the timestamp there proofs that this works as espected. But the client (normal jms app client) does not get it in that order.

                  • 6. Re: Message ordering

                    Misunderstandings:

                    1) Your fundamental misunderstanding appears to be that messages are globally ordered.
                    JMS guarantees that messages sent from a single sender are ordered. It says
                    nothing about competing senders (your mdbs?). Though most jms providers
                    use a form of timestamp so it generally works, but is subject to race conditions.

                    2) An MDB does not do a send until the transaction commits. This is outside your
                    lock and so is also subject to race conditions.

                    XASession -> onMessage -> lock -> send
                    XASession -> commit -> send really happens here

                    3) That a QueueBrowser can see messages that are currently in the process of being
                    sent to a client but have not yet been acknowledged by a client.
                    It does not see these messages.

                    If none of this applies use "READ THIS FIRST" to show me what IS happening.

                    WARNING: If you dump pages of pages of logging with no context about how it
                    matches what you are trying to do, I won't read it.

                    I was originally interested in this post because I though you might have a bug.
                    But it sounds like you don't understand (or are trying to bypass) the spec defined
                    behaviour.

                    • 7. Re: Message ordering
                      greydeath

                      I understand the Spec and do not try to bypass it. But the problem I have is not solved by any standard mechanism.

                      I have messages on a queue that should be processed by MDB's. But messages with the same key (message property) must not be parallelized, but processed in the order they came onto the queue. Messages with different key's are indepent from each other. We call that KeyStreaming.

                      Importantly, I think I found the problem during my tests, and does seem either a bug in the Jboss MQ or a configuration error that I didn't find until now. I also found a workaround, but I am not sure if it is 100%. I will point that out at the end of the post and hope you bare with me until then.

                      OK, First to your questions, then I will show you the interessting parts of my code. The concept worked before with Oracle AQ in a non-J2EE Environment.

                      1) That is interessting. Although you are right that there are multiple MDB's, the application logic ensures that messages with the same key are processed and sent to the queue in the right order! In my case there is a 2 second gab between the first set and the second. Nevertheless the client (Non-MDB) gets the second set first.

                      Does that mean JbossMQ does not garantee that kind of global ordering? How can I ensure global ordering? using the priority as a counter seems a crude workaround.

                      2) In case of a key the MDB calls another SessionBean that has bean-managed transactions. That means that the onMessage transaction is suspended. The SessionBean does:

                      XASession --> onMessage --> EJB-Call --> XASuspend-->
                      lock --> beginUserTrans --> send --> endUserTrans --> unlock
                      End-EJB-Call --> XAResume --> XACommit

                      The xacommit of the MDB does not concern me in my session bean right?

                      3) The MDB listens with a messageSelector that is mutal-exclusive to the one used my the SessionBean and its QueueBrowser. The messages received and seen by the QueueBrowser are not subject to any race conditions as there is only ever 1 instance that will consume them. Hence If nobody received them, the QueueBrowser has to see them.

                      Standard Configuration
                      Default Server
                      Java5 RedHat 3 AS
                      No Changes to MDB configuration.

                      Ok First of all. The piece of code that sends to the input side of the MDB uses the following:

                      public class KeyStreamer
                      {
                       private HashSet alreadySent = new HashSet();
                       public static final String DATA_ONLY_PROPERTY = "queue_keystream_data";
                       private final static Logger logger = Logger.getLogger(KeyStreamer.class);
                       public void send(Session session,MessageProducer producer,javax.jms.Message msg) throws JMSException
                       {
                       if (producer.getDestination() instanceof Queue)
                       {
                       final String key = msg.getStringProperty(MessageProperties.MSG_KEY_PROPERTY);
                       msg.setBooleanProperty(DATA_ONLY_PROPERTY,true);
                       if (!alreadySent.contains(key))
                       {
                       final Message notification = session.createMessage();
                       notification.setStringProperty(MessageProperties.MSG_KEY_PROPERTY,key);
                       producer.send(notification);
                       logger.info("Notification for: " + key);
                       alreadySent.add(key);
                       }
                       producer.send(msg);
                       }
                       else
                       throw new NotImplementedException("Keystreaming on topics is not implemented");
                       }
                      
                       public void reset()
                       {
                       alreadySent.clear();
                       }
                      }
                      


                      reset is called after every commit. In short it means that for every new key there will exist a 'notification' message in one transaction.

                      The MDB has a message selector 'queue_keystream_data is null', It only listens for notifications.

                      The MDB's onMessage method includes the following:

                      if (message.propertyExists(MessageProperties.MSG_KEY_PROPERTY))
                       {
                       final Destination lJMSDestination = message.getJMSDestination();
                       final String key = message.getStringProperty(MessageProperties.MSG_KEY_PROPERTY);
                       keyStreamer.create().keyStream(key,(Queue) lJMSDestination , proxy);
                      
                       }
                       else
                       proxy.process(message);


                      proxy is a small handler that sends the message to the output queue.
                      The KeyStreamer:

                      /**
                       * @ejb.interface-method view-type="local"
                       * @ejb.transaction type="NotSupported"
                       */
                       public void keyStream(String key, Queue jmsDestination, ProcessProxy proxy) throws RBException
                       {
                      
                       if (!tryLock(key, jmsDestination))
                       {
                       logger.info("Dump Key: " + key);
                       return;
                       }
                       boolean locked = true;
                       try
                       {
                       do
                       {
                       MessageConsumer consumer = setupKeyStreamedConnection(key, jmsDestination);
                       try
                       {
                       process(consumer, proxy);
                       } finally
                       {
                       ResourceUtils.jms.SafeClose(consumer);
                       }
                       unLock(key, jmsDestination);
                       locked = false;
                       try
                       {
                       logger.info("Check key: " + key);
                       final QueueBrowser lBrowser = session.createBrowser(jmsDestination, getMessageSelector(key));
                       try
                       {
                       if (!lBrowser.getEnumeration().hasMoreElements())
                       break;
                       } finally
                       {
                       ResourceUtils.jms.SafeClose(lBrowser);
                       }
                       } catch (JMSException e)
                       {
                       throw new EJBException(e);
                       }
                       logger.info("Found keys again: " + key);
                       if (!tryLock(key, jmsDestination))
                       return;
                       locked = true;
                       } while (true);
                       } finally
                       {
                       logger.info("End Keystreaming: " + key);
                       if (locked)
                       unLock(key, jmsDestination);
                       deinit();
                       }
                      
                       }
                       private MessageConsumer setupKeyStreamedConnection(String key, Destination jmsDestination)
                       {
                       try
                       {
                       setupSession();
                       return session.createConsumer(jmsDestination, getMessageSelector(key));
                       } catch (JMSException e)
                       {
                       throw new EJBException("Setup of keystreamed jms connection", e);
                       }
                       }
                      
                       final protected String getMessageSelector(String key)
                       {
                      //keyStreamSelector is "queue_keystream_data = TRUE and MessageProperties.MSG_KEY_PROPERTY = '?'
                       return StringUtils.replace(keyStreamSelector, "?", key);
                       }
                       protected void process(MessageConsumer consumer, ProcessProxy proxy) throws RBException
                       {
                       try
                       {
                       connection.start();
                       final UserTransaction ut = getSessionContext().getUserTransaction();
                       try
                       {
                       ResourceUtils.transaction.Begin(ut,EJBException.class);
                       Message message = consumer.receiveNoWait();
                       while (message != null)
                       {
                       proxy.process(message);
                       ResourceUtils.transaction.Commit(ut,EJBException.class);
                       ResourceUtils.transaction.Begin(ut,EJBException.class);
                       message = consumer.receiveNoWait();
                       }
                      
                       ResourceUtils.transaction.Commit(ut,EJBException.class);
                       } catch (RBException e)
                       {
                       ResourceUtils.transaction.SafeRollback(ut);
                       throw e;
                       }
                       } catch (JMSException e)
                       {
                       throw new EJBException(e);
                       }
                      
                       }
                      


                      1) The MDB gets a notification.
                      2) It tries to lock the key. If it does not get the lock, another MDB has already received a notification on the same key and is working on it. It is the other MDB's responsibility.
                      3) if it got the lock, setup a consumer with a selector to only get the 'data' messages with that key --> getMessageSelector.
                      4) consume and process all 'data' messages. wrap each one i a user transaction
                      5) unlock
                      6) Check if the queue received any other messages with the same selector in the mean time--> Queue Browser. If yes, start with TryLock again. Otherwise go home.

                      As you can see now, the MDB and the SB consume different sets of messages. There is one notification for each key in each transaction on the input side. This way it should be impossible to miss any data messages because of race conditions. The notification 'triggers' the consumtion of the related data messages. The lock ensures that only one is working on them at a time, thus ensuring the right processing order. I verified that already, no problems there.

                      There is a race condition between the last receiveNoWait and the unlock. In this small amount of time, the input queue might have received another sent of messages (notification and data) for that key. Also in the same time another MDB might have consumed the notifcation, did not get the lock and dumped the notification. The data messages are on the queue without a trigger!
                      Therefore we check if there are any new data messages after the unlock. That is done via the QueueBrowser. And we are really only interested in messages that are not consumed by anyone. If we find one, that means that the race condition before did apply and we tryLock again.
                      Why use a Browser isstead of the receiver? I must not change the Queue outside the lock. Because Although I might see a new message through the browser, another MDB might start consuming the key.

                      This all would be easier if I could draw you a diagram. But I hope you understand this now better.


                      IMPORTANT:

                      I found something very important. The client code in my test added a messageproperty 'Transaction' which it assigned a number for each JMS Transaction. Thus I could track the transaction boundries and their behaviour in the backend.

                      I found that although I have a notification and a data message in one JMS Transaction, The notfication gets consumed and the data is not available on the queue?

                      For me it seems like the notification message is already read before all other messages in that transactions are available.

                      To check that I changed the 'class KeyStreamer' to send the notifications not before the data messages but in the reset method which is called just before the commit. Hence the notifications (that trigger the MDB) are always the last messages on the queue in a JMS Transaction.
                      And All of a sudden my testcase worked!!

                      Has this todo with transaction isolation? Where can I check that for JbossMQ, I couldn't find it.

                      • 8. Re: Message ordering
                        greydeath

                        Adrian, one last thing. I have now a test running, that will run until tomorrow moring me time (12 hours straight). Until now every second testcase failed.

                        • 9. Re: Message ordering

                          There is no such concept of transaction isolation in JMS
                          (it could be done as a value added feature, but it isn't supported by JBossMQ).

                          The JMS spec just says that all messages in a single tx are sent or not sent.
                          It says nothing about the timing of when they appear on the destination.

                          Even if there were transaction isolation, the non transactional QueueBrowser
                          would not necessarily be included in this behaviour.

                          In fact, this is the only part of JMS that would require transaction isolation
                          as it is the only part that gives you any form of view of the internal
                          state of a destination.
                          But since it is non transactional, transaction isolation is not required.

                          The reason why it probably works with OracleAQ is you
                          are talking to a special set of database tables when you use their JMS.
                          The behaviour is implicit in their implementation.

                          You are relying on a non-spec feature.

                          • 10. Re: Message ordering
                            greydeath

                            > The JMS spec just says that all messages in a single tx are sent or not sent.
                            > It says nothing about the timing of when they appear on the destination.

                            But it would make sense if all messages in the same transaction appear on the destination at the same time. In Fact I think it is what everybody assumes. It is a shame the spec is so lax in that respect.

                            About the transaction isolation, your statement about the QueueBrowser does not apply. The SB uses a transacted MessageConsumer first and THAT does not see the messages that were in the same transaction as the one that triggered the MDB.

                            About the global ordering. How can I ensure global ordering across multiple Sessions? I already make sure to send and commit in the right order.

                            • 11. Re: Message ordering

                              So you don't want it to send any messages to a client until all messages
                              in a transaction are ready to be sent to the client.

                              This would introduce latency and concurrency overhead.
                              This is the natural overhead of any transaction isolation.

                              In fact, it is the worst kind (in terms of performance)
                              of transaction isolation since you want to avoid
                              an attempt to read data where one transaction might not see data about to
                              be committed by another transaction (phantom reads).

                              I was trying to decide before whether this was one of the my "favourite" JMS
                              anti-patterns (trying to use JMS as a database).
                              I think you just confirmed it?

                              If you are interested in implementing this feature for JBossMQ,
                              I would suggest you rewrite JBossMQ internally to use a database
                              for all data structures.

                              The other alternative would be introduce database locking semantics
                              into JBossMQ.
                              i.e. either lock queues to prevent receivers taking messages that might
                              break your isolation level
                              or let them take messages, but give them
                              an optimisitic failure if the attempt to make acknowledgements that
                              would break the isolation.

                              You have definitley fallen into another JMS anti pattern.
                              Trying to synchronously link one jms transaction with another.

                              Asynchronous messaging is designed not to have this semantic.

                              Some messaging systems give you an alternate semantic that is more suited
                              to asynchronous behaviour.
                              i.e They let you define group properties so that you specify that all messages
                              in a group should go to a single consumer and they should all be processed together
                              or not at all.
                              This is mainly intended to support message fragmentation.
                              JBossMQ has no such support.

                              • 12. Re: Message ordering

                                END OF TOPIC FOR ME.

                                This is not a JBossMQ bug, though it might be considered an example of a feature
                                that JBossMQ does not support.
                                It is a misunderstanding of the JMS semantics.

                                • 13. Re: Message ordering
                                  greydeath

                                  Ok End of topic, but can you stop bitching at people all the time.

                                  I know that the problem I have does effect performance badly, so what I have to solve that business problem! I know that it is an anti-pattern, but that does not make the requirement go away!

                                  We are not all dummies you know.

                                  I know about the JMSXGroup feature and the JbossMQ does not implement it. Actually I didn't find a JMS solution that does.

                                  END OF TOPIC FOR ME TOO.

                                  • 14. Re: Message ordering

                                    I'm bitching because it took nearly two pages of posts and explaining JMS
                                    semantics before I found out you are "abusing" the api.

                                    If you want to try to implement the JMSXGroup in JBossMQ, I can explain the
                                    changes required.
                                    This would still have latency (it has to wait for all messages in the group)
                                    but you don't get the global locks across the queues, the "isolation" is at the
                                    group level.

                                    I don't do business problems here, only tech problems.
                                    If the problem is you are abusing the tech, I will say so.
                                    The alternative would be to stay quiet and let you wonder why it doesn't work.