-
1. Re: Message ordering
adrian.brock Feb 4, 2005 9:59 PM (in response to greydeath)I don't believe your assertions. Use "READ THIS FIRST" to show me what is really
happening.
The code is too trivial to be at fault:
from org.jboss.mq.MessageProducer#send(...)long ts = System.currentTimeMillis(); sendMessage.setJMSTimestamp(ts);
-
2. Re: Message ordering
greydeath Feb 7, 2005 3:58 AM (in response to greydeath)Adrian,
I have several problems and I do not know how to sovle them.
1. Message Order:
I don't how it happens but I proved now that the messages are sometimes not delivered in the same order as sent to the queue.
First transaction sender:
09:18:27,343 INFO [JMS SessionPool Worker-13][JMSDestinationProxy] Proxy Message: 7 Correl: 2
09:18:27,345 INFO [JMS SessionPool Worker-13][JMSDestinationProxy] Proxy Message: 7 Correl: 3
I turned on the trace for mq. I checked that the transaction for these messages is definitly commited before the following:
09:18:29,043 INFO [JMS SessionPool Worker-6][JMSDestinationProxy] Proxy Message: 7 Correl: 4
And as you see it is also 2 seconds later!!
Nevertheless my client receives this last message first.
As for the timestamp, I cannot verify my original assumption anymore, maybe I made a mistake there.
Anyway, I can reproduce the problem I just wrote you any time.
My Application does 'KeyStreaming'. There are MDB's which consume messages and sent them to another queue again. Between themselfs they make sure that a certain order is retained on the 'OUT' queue.
These are the timestamps and the correlation id's on the out queue (the c-id is the order that should be)
11077643073431 2
11077643073452 3
11077643090434 4
11077643106761 5
11077643106802 6
11077643106853 7
11077643106904 8
11077643106955 9
Everything ok here.But I defenitly received 4 at first. From the client-id you can also see the transaction boundaries, as every transaction is a new client in my example[org.jboss.mq.SpyMessage { Header { jmsDestination : QUEUE.testKeystreamOut jmsDeliveryMode : 2 jmsExpiration : 0 jmsPriority : 4 jmsMessageID : ID:23-11077643090434 jmsTimeStamp : 1107764309043 jmsCorrelationID: 4 jmsReplyTo : null jmsType : null jmsRedelivered : true jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=4, JMS_JBOSS_REDELIVERY_COUNT=1} jmsPropReadWrite: false msgReadOnly : true producerClientId: ID:23 } }, org.jboss.mq.SpyMessage { Header { jmsDestination : QUEUE.testKeystreamOut jmsDeliveryMode : 2 jmsExpiration : 0 jmsPriority : 4 jmsMessageID : ID:13-11077643106761 jmsTimeStamp : 1107764310676 jmsCorrelationID: 5 jmsReplyTo : null jmsType : null jmsRedelivered : false jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=6} jmsPropReadWrite: false msgReadOnly : true producerClientId: ID:13 } }, org.jboss.mq.SpyMessage { Header { jmsDestination : QUEUE.testKeystreamOut jmsDeliveryMode : 2 jmsExpiration : 0 jmsPriority : 4 jmsMessageID : ID:13-11077643106802 jmsTimeStamp : 1107764310680 jmsCorrelationID: 6 jmsReplyTo : null jmsType : null jmsRedelivered : false jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=6} jmsPropReadWrite: false msgReadOnly : true producerClientId: ID:13 } }, org.jboss.mq.SpyMessage { Header { jmsDestination : QUEUE.testKeystreamOut jmsDeliveryMode : 2 jmsExpiration : 0 jmsPriority : 4 jmsMessageID : ID:13-11077643106853 jmsTimeStamp : 1107764310685 jmsCorrelationID: 7 jmsReplyTo : null jmsType : null jmsRedelivered : false jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=7} jmsPropReadWrite: false msgReadOnly : true producerClientId: ID:13 } }, org.jboss.mq.SpyMessage { Header { jmsDestination : QUEUE.testKeystreamOut jmsDeliveryMode : 2 jmsExpiration : 0 jmsPriority : 4 jmsMessageID : ID:13-11077643106904 jmsTimeStamp : 1107764310690 jmsCorrelationID: 8 jmsReplyTo : null jmsType : null jmsRedelivered : false jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=8} jmsPropReadWrite: false msgReadOnly : true producerClientId: ID:13 } }, org.jboss.mq.SpyMessage { Header { jmsDestination : QUEUE.testKeystreamOut jmsDeliveryMode : 2 jmsExpiration : 0 jmsPriority : 4 jmsMessageID : ID:13-11077643106955 jmsTimeStamp : 1107764310695 jmsCorrelationID: 9 jmsReplyTo : null jmsType : null jmsRedelivered : false jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=9} jmsPropReadWrite: false msgReadOnly : true producerClientId: ID:13 } }, org.jboss.mq.SpyMessage { Header { jmsDestination : QUEUE.testKeystreamOut jmsDeliveryMode : 2 jmsExpiration : 0 jmsPriority : 4 jmsMessageID : ID:15-11077643073431 jmsTimeStamp : 1107764307343 jmsCorrelationID: 2 jmsReplyTo : null jmsType : null jmsRedelivered : false jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=1} jmsPropReadWrite: false msgReadOnly : true producerClientId: ID:15 } }, org.jboss.mq.SpyMessage { Header { jmsDestination : QUEUE.testKeystreamOut jmsDeliveryMode : 2 jmsExpiration : 0 jmsPriority : 4 jmsMessageID : ID:15-11077643073452 jmsTimeStamp : 1107764307345 jmsCorrelationID: 3 jmsReplyTo : null jmsType : null jmsRedelivered : false jmsProperties : {queue_keystream_data=true, messageKey=7, Transaction=2} jmsPropReadWrite: false msgReadOnly : true producerClientId: ID:15 } }]
-
3. Re: Message ordering
greydeath Feb 7, 2005 4:48 AM (in response to greydeath)Maybe I should describe what the MDB does.
My messages have keys (messageKey). The Sender will put a 'notification message' on the queue for each new key in one transaction. These messages only have a property messageKey.
The MDB is listening on these messages (message selector). Upon onMessage it locks the key to ensure that no other MDB is working on that key. After that it sets up a queue receiver to receive the 'data messages' for that key (queue_keystream_data is not null and messageKey=7) and processes them. (In my simple scenario it just sends the message to another queue) Ever message is its own transaction here.
If no more are to be received, it closes the consumer and unlocks the key.
After that it opens a QueueBrowser with the same message selector to check whether any new messages arrived in the mean time.
Here is my second problem. I sometimes loose messages (forget them on the input side) and it seems that happens because the queue browser somtimes does not have elements although there are some on the queue.
This whole thing is to ensure that messages with the same key are processed in the right order.
Refering to me last post; therefore I know the messages of messageKey 7 go to the OUT queue in the right order, but the retrieving client does not get it that way. -
4. Re: Message ordering
adrian.brock Feb 7, 2005 5:53 PM (in response to greydeath)This is an FAQ. In fact multiple FAQs.
1) Read the spec about no guarantee of ordering of an MDB
2) Read the spec about EJBs should not do internal locking (it certainly doesn't work in a cluster)
3) Read the JBossMQ FAQ about MDB singleton (which is probably what you are looking for?)
4) Read the spec about QueueBrowser is NOT a transactional object and only sees what has not been sent to a client, i.e. it is a snapshot
5) Read my many posts about how the JBoss MDB works (in particular the read ahead processing AND the difference between the jms session pool and ejb instance pool)
etc.
I moved another guys post to the "Useless rants" forum because he was asking
virtually the same question that has been answered Sooo many times before and refused
to do his own research. -
5. Re: Message ordering
greydeath Feb 10, 2005 1:44 AM (in response to greydeath)Adrian,
1) I know that, that is the reason for how we do 'keystreaming'
2) Maybe it shouldn't but for this purpose I need to. And I use a cluster locking mechanism
3) That is not what I want
4) exactly I want a snapshot!!
5) In effect the Queue the MDB listens on has 2 sets of messages, distinguishably by a message property. The MDB only listens on one of them. The other set is consumed inside the MDB via a normal queue receiver.
So the MDB should not 'read ahead' the messages that are not selected by its message selector right?
Please do not just ignore this as a useless rant!! It isn't. And I do not ask for design either, so please don't just pull the read the FAQ card.
I have two distinct problems.
1) I know that Messages are on the queue but the QueueBrowser does not show them. I know that the browser is not transactional and that is ok. All I want is to check if messages of a certain type are in the queue!!
2) Different MDB's are sending Messages to a Queue. My code garanties that messages with the same key are on the output queue in the same order as they came in!!. The log and the timestamp there proofs that this works as espected. But the client (normal jms app client) does not get it in that order. -
6. Re: Message ordering
adrian.brock Feb 10, 2005 1:30 PM (in response to greydeath)Misunderstandings:
1) Your fundamental misunderstanding appears to be that messages are globally ordered.
JMS guarantees that messages sent from a single sender are ordered. It says
nothing about competing senders (your mdbs?). Though most jms providers
use a form of timestamp so it generally works, but is subject to race conditions.
2) An MDB does not do a send until the transaction commits. This is outside your
lock and so is also subject to race conditions.
XASession -> onMessage -> lock -> send
XASession -> commit -> send really happens here
3) That a QueueBrowser can see messages that are currently in the process of being
sent to a client but have not yet been acknowledged by a client.
It does not see these messages.
If none of this applies use "READ THIS FIRST" to show me what IS happening.
WARNING: If you dump pages of pages of logging with no context about how it
matches what you are trying to do, I won't read it.
I was originally interested in this post because I though you might have a bug.
But it sounds like you don't understand (or are trying to bypass) the spec defined
behaviour. -
7. Re: Message ordering
greydeath Feb 11, 2005 9:55 AM (in response to greydeath)I understand the Spec and do not try to bypass it. But the problem I have is not solved by any standard mechanism.
I have messages on a queue that should be processed by MDB's. But messages with the same key (message property) must not be parallelized, but processed in the order they came onto the queue. Messages with different key's are indepent from each other. We call that KeyStreaming.
Importantly, I think I found the problem during my tests, and does seem either a bug in the Jboss MQ or a configuration error that I didn't find until now. I also found a workaround, but I am not sure if it is 100%. I will point that out at the end of the post and hope you bare with me until then.
OK, First to your questions, then I will show you the interessting parts of my code. The concept worked before with Oracle AQ in a non-J2EE Environment.
1) That is interessting. Although you are right that there are multiple MDB's, the application logic ensures that messages with the same key are processed and sent to the queue in the right order! In my case there is a 2 second gab between the first set and the second. Nevertheless the client (Non-MDB) gets the second set first.
Does that mean JbossMQ does not garantee that kind of global ordering? How can I ensure global ordering? using the priority as a counter seems a crude workaround.
2) In case of a key the MDB calls another SessionBean that has bean-managed transactions. That means that the onMessage transaction is suspended. The SessionBean does:
XASession --> onMessage --> EJB-Call --> XASuspend-->
lock --> beginUserTrans --> send --> endUserTrans --> unlock
End-EJB-Call --> XAResume --> XACommit
The xacommit of the MDB does not concern me in my session bean right?
3) The MDB listens with a messageSelector that is mutal-exclusive to the one used my the SessionBean and its QueueBrowser. The messages received and seen by the QueueBrowser are not subject to any race conditions as there is only ever 1 instance that will consume them. Hence If nobody received them, the QueueBrowser has to see them.
Standard Configuration
Default Server
Java5 RedHat 3 AS
No Changes to MDB configuration.
Ok First of all. The piece of code that sends to the input side of the MDB uses the following:public class KeyStreamer { private HashSet alreadySent = new HashSet(); public static final String DATA_ONLY_PROPERTY = "queue_keystream_data"; private final static Logger logger = Logger.getLogger(KeyStreamer.class); public void send(Session session,MessageProducer producer,javax.jms.Message msg) throws JMSException { if (producer.getDestination() instanceof Queue) { final String key = msg.getStringProperty(MessageProperties.MSG_KEY_PROPERTY); msg.setBooleanProperty(DATA_ONLY_PROPERTY,true); if (!alreadySent.contains(key)) { final Message notification = session.createMessage(); notification.setStringProperty(MessageProperties.MSG_KEY_PROPERTY,key); producer.send(notification); logger.info("Notification for: " + key); alreadySent.add(key); } producer.send(msg); } else throw new NotImplementedException("Keystreaming on topics is not implemented"); } public void reset() { alreadySent.clear(); } }
reset is called after every commit. In short it means that for every new key there will exist a 'notification' message in one transaction.
The MDB has a message selector 'queue_keystream_data is null', It only listens for notifications.
The MDB's onMessage method includes the following:if (message.propertyExists(MessageProperties.MSG_KEY_PROPERTY)) { final Destination lJMSDestination = message.getJMSDestination(); final String key = message.getStringProperty(MessageProperties.MSG_KEY_PROPERTY); keyStreamer.create().keyStream(key,(Queue) lJMSDestination , proxy); } else proxy.process(message);
proxy is a small handler that sends the message to the output queue.
The KeyStreamer:/** * @ejb.interface-method view-type="local" * @ejb.transaction type="NotSupported" */ public void keyStream(String key, Queue jmsDestination, ProcessProxy proxy) throws RBException { if (!tryLock(key, jmsDestination)) { logger.info("Dump Key: " + key); return; } boolean locked = true; try { do { MessageConsumer consumer = setupKeyStreamedConnection(key, jmsDestination); try { process(consumer, proxy); } finally { ResourceUtils.jms.SafeClose(consumer); } unLock(key, jmsDestination); locked = false; try { logger.info("Check key: " + key); final QueueBrowser lBrowser = session.createBrowser(jmsDestination, getMessageSelector(key)); try { if (!lBrowser.getEnumeration().hasMoreElements()) break; } finally { ResourceUtils.jms.SafeClose(lBrowser); } } catch (JMSException e) { throw new EJBException(e); } logger.info("Found keys again: " + key); if (!tryLock(key, jmsDestination)) return; locked = true; } while (true); } finally { logger.info("End Keystreaming: " + key); if (locked) unLock(key, jmsDestination); deinit(); } } private MessageConsumer setupKeyStreamedConnection(String key, Destination jmsDestination) { try { setupSession(); return session.createConsumer(jmsDestination, getMessageSelector(key)); } catch (JMSException e) { throw new EJBException("Setup of keystreamed jms connection", e); } } final protected String getMessageSelector(String key) { //keyStreamSelector is "queue_keystream_data = TRUE and MessageProperties.MSG_KEY_PROPERTY = '?' return StringUtils.replace(keyStreamSelector, "?", key); } protected void process(MessageConsumer consumer, ProcessProxy proxy) throws RBException { try { connection.start(); final UserTransaction ut = getSessionContext().getUserTransaction(); try { ResourceUtils.transaction.Begin(ut,EJBException.class); Message message = consumer.receiveNoWait(); while (message != null) { proxy.process(message); ResourceUtils.transaction.Commit(ut,EJBException.class); ResourceUtils.transaction.Begin(ut,EJBException.class); message = consumer.receiveNoWait(); } ResourceUtils.transaction.Commit(ut,EJBException.class); } catch (RBException e) { ResourceUtils.transaction.SafeRollback(ut); throw e; } } catch (JMSException e) { throw new EJBException(e); } }
1) The MDB gets a notification.
2) It tries to lock the key. If it does not get the lock, another MDB has already received a notification on the same key and is working on it. It is the other MDB's responsibility.
3) if it got the lock, setup a consumer with a selector to only get the 'data' messages with that key --> getMessageSelector.
4) consume and process all 'data' messages. wrap each one i a user transaction
5) unlock
6) Check if the queue received any other messages with the same selector in the mean time--> Queue Browser. If yes, start with TryLock again. Otherwise go home.
As you can see now, the MDB and the SB consume different sets of messages. There is one notification for each key in each transaction on the input side. This way it should be impossible to miss any data messages because of race conditions. The notification 'triggers' the consumtion of the related data messages. The lock ensures that only one is working on them at a time, thus ensuring the right processing order. I verified that already, no problems there.
There is a race condition between the last receiveNoWait and the unlock. In this small amount of time, the input queue might have received another sent of messages (notification and data) for that key. Also in the same time another MDB might have consumed the notifcation, did not get the lock and dumped the notification. The data messages are on the queue without a trigger!
Therefore we check if there are any new data messages after the unlock. That is done via the QueueBrowser. And we are really only interested in messages that are not consumed by anyone. If we find one, that means that the race condition before did apply and we tryLock again.
Why use a Browser isstead of the receiver? I must not change the Queue outside the lock. Because Although I might see a new message through the browser, another MDB might start consuming the key.
This all would be easier if I could draw you a diagram. But I hope you understand this now better.
IMPORTANT:
I found something very important. The client code in my test added a messageproperty 'Transaction' which it assigned a number for each JMS Transaction. Thus I could track the transaction boundries and their behaviour in the backend.
I found that although I have a notification and a data message in one JMS Transaction, The notfication gets consumed and the data is not available on the queue?
For me it seems like the notification message is already read before all other messages in that transactions are available.
To check that I changed the 'class KeyStreamer' to send the notifications not before the data messages but in the reset method which is called just before the commit. Hence the notifications (that trigger the MDB) are always the last messages on the queue in a JMS Transaction.
And All of a sudden my testcase worked!!
Has this todo with transaction isolation? Where can I check that for JbossMQ, I couldn't find it. -
8. Re: Message ordering
greydeath Feb 11, 2005 10:11 AM (in response to greydeath)Adrian, one last thing. I have now a test running, that will run until tomorrow moring me time (12 hours straight). Until now every second testcase failed.
-
9. Re: Message ordering
adrian.brock Feb 11, 2005 3:22 PM (in response to greydeath)There is no such concept of transaction isolation in JMS
(it could be done as a value added feature, but it isn't supported by JBossMQ).
The JMS spec just says that all messages in a single tx are sent or not sent.
It says nothing about the timing of when they appear on the destination.
Even if there were transaction isolation, the non transactional QueueBrowser
would not necessarily be included in this behaviour.
In fact, this is the only part of JMS that would require transaction isolation
as it is the only part that gives you any form of view of the internal
state of a destination.
But since it is non transactional, transaction isolation is not required.
The reason why it probably works with OracleAQ is you
are talking to a special set of database tables when you use their JMS.
The behaviour is implicit in their implementation.
You are relying on a non-spec feature. -
10. Re: Message ordering
greydeath Feb 14, 2005 1:46 AM (in response to greydeath)> The JMS spec just says that all messages in a single tx are sent or not sent.
> It says nothing about the timing of when they appear on the destination.
But it would make sense if all messages in the same transaction appear on the destination at the same time. In Fact I think it is what everybody assumes. It is a shame the spec is so lax in that respect.
About the transaction isolation, your statement about the QueueBrowser does not apply. The SB uses a transacted MessageConsumer first and THAT does not see the messages that were in the same transaction as the one that triggered the MDB.
About the global ordering. How can I ensure global ordering across multiple Sessions? I already make sure to send and commit in the right order. -
11. Re: Message ordering
adrian.brock Feb 14, 2005 2:36 PM (in response to greydeath)So you don't want it to send any messages to a client until all messages
in a transaction are ready to be sent to the client.
This would introduce latency and concurrency overhead.
This is the natural overhead of any transaction isolation.
In fact, it is the worst kind (in terms of performance)
of transaction isolation since you want to avoid
an attempt to read data where one transaction might not see data about to
be committed by another transaction (phantom reads).
I was trying to decide before whether this was one of the my "favourite" JMS
anti-patterns (trying to use JMS as a database).
I think you just confirmed it?
If you are interested in implementing this feature for JBossMQ,
I would suggest you rewrite JBossMQ internally to use a database
for all data structures.
The other alternative would be introduce database locking semantics
into JBossMQ.
i.e. either lock queues to prevent receivers taking messages that might
break your isolation level
or let them take messages, but give them
an optimisitic failure if the attempt to make acknowledgements that
would break the isolation.
You have definitley fallen into another JMS anti pattern.
Trying to synchronously link one jms transaction with another.
Asynchronous messaging is designed not to have this semantic.
Some messaging systems give you an alternate semantic that is more suited
to asynchronous behaviour.
i.e They let you define group properties so that you specify that all messages
in a group should go to a single consumer and they should all be processed together
or not at all.
This is mainly intended to support message fragmentation.
JBossMQ has no such support. -
12. Re: Message ordering
adrian.brock Feb 14, 2005 2:39 PM (in response to greydeath)END OF TOPIC FOR ME.
This is not a JBossMQ bug, though it might be considered an example of a feature
that JBossMQ does not support.
It is a misunderstanding of the JMS semantics. -
13. Re: Message ordering
greydeath Feb 15, 2005 2:14 AM (in response to greydeath)Ok End of topic, but can you stop bitching at people all the time.
I know that the problem I have does effect performance badly, so what I have to solve that business problem! I know that it is an anti-pattern, but that does not make the requirement go away!
We are not all dummies you know.
I know about the JMSXGroup feature and the JbossMQ does not implement it. Actually I didn't find a JMS solution that does.
END OF TOPIC FOR ME TOO. -
14. Re: Message ordering
adrian.brock Feb 15, 2005 12:12 PM (in response to greydeath)I'm bitching because it took nearly two pages of posts and explaining JMS
semantics before I found out you are "abusing" the api.
If you want to try to implement the JMSXGroup in JBossMQ, I can explain the
changes required.
This would still have latency (it has to wait for all messages in the group)
but you don't get the global locks across the queues, the "isolation" is at the
group level.
I don't do business problems here, only tech problems.
If the problem is you are abusing the tech, I will say so.
The alternative would be to stay quiet and let you wonder why it doesn't work.