The HA IL is not supported.
You need a bit more than load balancing to make
clustered JMS work.
I know that clustered JMS doesn't work yet but eventhough it would be nice to have that is not what I'm looking for now.
I have a mbean that listens to a queue and when it receives a message it it process it and pass it on to an external system. This takes a while so I would like to have multiple threads sending. So the question is how to acomplish that. I tried to instansiate two messagelisteners that registered to the queue but the second doesn't get called until the first have completed so it looks like there is just a single thread calling the onMessage method of all listeners.
Basically what I would like to be able to do is to have a thread pool and to be able to set how many threads are there to call the listeners (single or multiple) so that I know how many outstanding requests there are.
So that was why I started to look at the HA IL since that seamed to be the only way to do load balancing but I couldn't find anything about that someone had used it or if it solves my problem. I found out how to configure it now but haven't been able to test if it solves my problem.
This should probably be done in an MDB but I haven't found a way to progamatically create them, or it should be done as a connector but that requires JCA 1.5 which JBoss doesn't support yet.
The simplest way to create an MDB is
write an ejb-jar into /deploy
You don't need the classes in the same jar if they
are already deployed.
Load balancing allows you to contact multiple JMS servers.
Did you try creating a session for each listener?
I don't find writing to deploy as a viable solution, then I would probably prefer to call the deployer directly or figure out how it does to deploy the MDB.
Anyway, sounds like this kind of load balancer is not what I'm looking for since I still only run with one JMS Server and all is in the same JVM.
Who manages the thread that calls my onMessage method? Would it be possible to make an modification to have a thread pool (of configurable size) and if yes, where should I look?
How would a session help?
The thread that invokes onMessage() is a background
thread. It reads from a list that is populated from
the delivery thread of the connection.
so in which classes should I look for implementing an experimental thread pool?
It turned out that the answer was, as always, "read the f-cking source, Luke". I soon found that org.java.mq.SpyMessageConsumer is the class of intrest, and that it creates one thread per instance so I acctually had one thread per listener as I wanted. When I run with the debugger I see all threads and if I set a breakpoint in my onMessage method, I see that all threads can be in there in different stages at the same time (on different instances eventhough it would be enough to create multiple listeners and registering one instance).
Anyway, when I run at full speed I don't see quite the same behaviour. Then it looks like one thread finish it's call to onMessage before the next enters imediatly after.
I've looked into the code of SpyMessageConsumer and it looks like each instance have it's own linked list with the messages to send and that they get them from someone calling the addMessage method. So if that object that calls the addMessage method on SpyMessageConsumer doesn't distribute the messages to all listeners, the load balancing between them will not work.
At this point the discussion should probably continue in the appropriate development forum but my idea is that there should be one linked list handled by the session probably that all listeners check for getting their messages. They do wait on the linked list object just like now and one of them gets the mutex when notify is called and process the message. I think that was the intention since all that syncronization code is present in the class but then somehow it was forgotten.
Adrian, what do you think?
The session's list of messages is part of
connection consumer implementation used by mdbs.
It plays no role for normal clients, they go
directly to the subscriptions (receivers).
The "load balancing" is done by the list of waiting
receivers on the BasicQueue, the server pushes the
messages to the clients unless messages were available
when the client asks for a message.
Ok, now I had time enough to go through the JBossMQ code and now I understand why I see this behaviour.
BasicQueue keeps the waiting receivers in a HashSet and when it gets a messag, it calls the internalAddMessage method which iterates through the receivers and picks the first subscription that matches the headers of the message and pass the message on to it's ClientConsumer instance. After that it removes that subscription from the receivers. The ClientConsumer passes the message on through the registered ClientIL to a Connection's asyncDeliver method. It is then passed on to a SpyConsumer instance with the addMessage method. In my case that is a SpyMessageConsumer which puts the message in a LinkedList. A thread then picks it from the LinkedList and call the onMessage method on my listener.
I haven't found out exactly how yet but when the ClientConsumer has delivered the message it's subscription is added again to the receivers in BasicQueue.
receivers is a HashSet instance and it's implementation uses a HashMap to store it's entries in the HashMap key. HashMap keys are stored in an array and when a new key/value pair (the value is null in this case) is added, the array is searched for the first empty position in the array, which happens to be the first position since that is the one where the key were removed by the iterator in BasicQueue.internalAddMessage.
This means that the order of the subscriptions is maintained more or less to be the same as when they were registered the first time. internalAddMessage picks the first subscription and the only time it will pick the second subscription is when it receives a message before the first subscription has finished processing the previous message. ClientConsumer uses a thread pool so it looks like that the thread that calls it from BasicQueue just drops it of on a thread from the pool and returns so most of the times the first subscription process the messages as fast as they arrive so it's only in a few cases the second subscriber get used.
That means that most of the messages ends up in the LinkedList of the SpyMessageConsumer to which my first listeners is registered. The second listener receives a few and the rest of them receives nothing..
The behaviour I expected was that the messages would be evenly between the listeners, I think that is logica. That behaviour should be acomplished by changing the receivers in BasicQueue from a HashSet to a LinkedList, then the subscriber that has finished is added to the end of the list and have to wait until last. I'll test that and see what happens.
I find this implementation a bit strange and I'm not sure I understood it right. It looks like the messages are acctually stored in the LinkedList of SpyMessageConsumer while waiting for the thread calling onMessage to pick them up. Shouldn't there be a more direct connection between onMessage and the queue so that messages stays in the queue until they can be processed?
Yepp, a LinkedList gave expected behaviour. Now the messages are distributed evenly to all listeners.