-
1. Re: Selector Help needed - Possible alternative suggestions
timfox Jun 22, 2007 5:51 AM (in response to anhminh_tran)I am trying to understand the consumers you are using, you have multiple consumers with the *same* selector?
Do you also have other consumers with different selectors?
Please explain in more detail. -
2. Re: Selector Help needed - Possible alternative suggestions
anhminh_tran Jun 22, 2007 11:06 AM (in response to anhminh_tran)Hi Tim,
Thanks for looking at my question.
Yes, we have multiple consumers that might have the same selectors and or different selectors.
The idea of using the producer and consumer model for us is this.
There will be many requests that will be made concurrently. These requests all go into a persisent queue for guaranteed delivery. These requests are for a "job" to be worked on. These jobs might be quick or long running jobs.
What we want is to have a variable number of consumers. Each consumer will sit on a different box, since each job is very CPU intensive. Each consumer will look for certain types of jobs and process them one-at-a-time.
We want to have multiple consumers to have the *same* selectors because we want to scale side-ways; just add more boxes to be able to process more messages (jobs).
We also want to have multiple consumers to have different selectors, since certain types of jobs are known to be much smaller than the other jobs. So we want to have dedicated consumers to work on these types of jobs. This is so that the long running jobs do not clog up all the consumers and queue up all the small jobs. In our use case, it's ok to have the long running jobs take their time to do the work.. but it's mandatory to get all the small jobs through as soon as possible.
The other caveat is that certain consumers should be able to config to look for small jobs first, then if there are no small jobs in the queue, grab a big job. When do, start looking for a small job again... and so on..
So, we have three types of consumers if you may. One dedicated to handling small jobs. One dedicated to handling big jobs. One looks for small jobs, if non are there, look for big jobs to work on.
We actually do have requirements to have multiple consumers of big jobs to work on same and different types of of jobs..
We have all of these implemented and working with one problem that we didn't expect. That is, the consumer that is applying a selector, seems to be holding hostage all the messages that fit it's selector even though he's only called receive() for one message.
What we need is for other consumers to be able to apply the same selector and be able to get the messages that the other consumer didn't call receive() on yet. That way, we can have many consumers work on many of the same types of jobs. We need this to scale the processing of these jobs.
My other thought is if there is a way to specify for the consumer applying the selector that he only wants 1 message to be returned that matches his criteria. He will work on his job and then go ask for 1 more of the same.
Thanks,
Minh -
3. Re: Selector Help needed - Possible alternative suggestions
timfox Jun 24, 2007 11:04 AM (in response to anhminh_tran)In general it's a bad idea to use selectors with queues since it requires the entire queue to be scanned every time delivery is attempted (slow).
In most cases this can be refactored into using a topic with durable subscriptions - one durable subscription for each of your consumer types.
The selector on the durable sub is evaluated *before* the messages reach the durable sub, so it is much more efficient. (Think of each durable subscription of a queue, but it only contains those messages which match the selector).
Durable subscriptions can be clustered in JBM so you can have more than one consumer on the same durable subscription on different nodes of the cluster.
(There is an issue with message redistribution in 1.3.0 which reduces performance for this, but this will be fixed in 1.4.0.)
The issue about a message consumer getting "all" the messages is because each message consumer buffers ("prefetches") into its local buffer. This is done for performance reasons. For high throughput it is much more efficient not having to go the server every time you want a message.
If this is a problem for you, you can reduce the prefetch size - this is explained in the userguide. -
4. Re: Selector Help needed - Possible alternative suggestions
anhminh_tran Jun 26, 2007 5:57 AM (in response to anhminh_tran)Hi Tim,
You are right about the consumers prefetching messages from the queue. It had nothing to do with selectors. I had no selectors and still experienced the same thing.
I made a sample producer that allows me to put some messages in the queue.. I then had a consumer that reads from the queue.. then wait for 30 seconds.. I then had another instance of the same consumer run.. it seems that by the time it ran.. the first consumer had already prefetched all the messages off the queue.. like you said..
So, I tried setting the PrefetchSize to 1. But it did not work... I experienced the same issue. Here is my mbean definition in my connection-factory-service.xml
<mbean code="org.jboss.jms.server.connectionfactory.ConnectionFactory"
name="jboss.messaging.connectionfactory:service=ConnectionFactory"
xmbean-dd="xmdesc/ConnectionFactory-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer
<depends optional-attribute-name="Connector">jboss.messaging:service=Connector,transport=bisocket
jboss.messaging:service=PostOffice
/ConnectionFactory
/XAConnectionFactory
java:/ConnectionFactory
java:/XAConnectionFactory
1
Our use case is that we will not have many many messages.. but what we will have are long running jobs that are a result of the messages being picked up by the consumers.. Our plan is that by the time we feel that there are too many messages in the queue... we will just add more consumers to minimize the messages in the queue...
Could you please let me know if I am using the PrefetchSize attribute properly?
Thanks, -
5. Re: Selector Help needed - Possible alternative suggestions
timfox Jun 26, 2007 6:02 AM (in response to anhminh_tran)Please post again using code tags (see the code button?) otherwise we cannot read it.
-
6. Re: Selector Help needed - Possible alternative suggestions
anhminh_tran Jun 26, 2007 6:02 AM (in response to anhminh_tran)Sorry, here's my connection-factory-service.xml
<?xml version="1.0" encoding="UTF-8"?> <server> <mbean code="org.jboss.jms.server.connectionfactory.ConnectionFactory" name="jboss.messaging.connectionfactory:service=ConnectionFactory" xmbean-dd="xmdesc/ConnectionFactory-xmbean.xml"> <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends> <depends optional-attribute-name="Connector">jboss.messaging:service=Connector,transport=bisocket</depends> <depends>jboss.messaging:service=PostOffice</depends> <attribute name="JNDIBindings"> <bindings> <binding>/ConnectionFactory</binding> <binding>/XAConnectionFactory</binding> <binding>java:/ConnectionFactory</binding> <binding>java:/XAConnectionFactory</binding> </bindings> </attribute> <attribute name="PrefetchSize">1</attribute> </mbean> </server>
-
7. Re: Selector Help needed - Possible alternative suggestions
anhminh_tran Jun 26, 2007 1:33 PM (in response to anhminh_tran)BTW here is the consumer that I made. It's a modification of the one that came with the example code:
public class JMSConsumer { public static void main(String[] args) { String jndiDestinationName = "/queue/testQueue"; InitialContext ic = null; Connection connection = null; boolean deployed = false; try { if (!Util.doesDestinationExist(jndiDestinationName)) { System.out.println("Destination " + jndiDestinationName + " does not exist, deploying it"); Util.deployQueue(jndiDestinationName); deployed = true; } ic = new InitialContext(); ic.addToEnvironment("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory"); ic.addToEnvironment("java.naming.provider.url", "jnp://10.5.1.241:1099"); ic.addToEnvironment("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces"); ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ConnectionFactory"); Queue queue = (Queue) ic.lookup(jndiDestinationName); connection = cf.createConnection(); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(queue); connection.start(); System.out.println("Waiting for messages"); System.out.println("--------------------"); while (true) { TextMessage message = (TextMessage) consumer.receive(2000); if (message != null) { System.out.println("message=" + message.getText()); System.out.println("-------------------------------"); // Acknowledge that we've recieved and processed the message successfully message.acknowledge(); Thread.sleep(15000); } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (deployed) { Util.undeployQueue(jndiDestinationName); } } catch (Exception e) { e.printStackTrace(); } if (ic != null) { try { ic.close(); } catch (Exception e) { e.printStackTrace(); } } // ALWAYS close your connection in a finally block to avoid leaks // Closing connection also takes care of closing its related objects // e.g. sessions try { if (connection != null) { connection.close(); } } catch (JMSException jmse) { System.err.println("Could not close connection " + connection + " exception was " + jmse); jmse.printStackTrace(); } } } }
It seems that on the call to consumer.receive() it gets a batch of messages no matter if I set the PrefetchSize attribute to 1 or not.. -
8. Re: Selector Help needed - Possible alternative suggestions
timfox Jun 29, 2007 7:50 AM (in response to anhminh_tran)preFetch certainly seems to work for us.
Can you post a full test case that demonstrates prefetch not working and will investigate further?
Thanks.