8 Replies Latest reply on Jun 29, 2007 7:50 AM by timfox

    Selector Help needed - Possible alternative suggestions welc

      Hi,

      We have a need to have a persistent Queue that will, on one side, have producers publish messages (jobs) to it. On the otherside, a variable number of consumers need to be able to select for a specific type of message, if none of those types of messages are there, select for a different type of messages for processing.

      The problem that i'm running into is that on the call to consumer.receive(), even though I only get one message back, if there are more than one message that matches the selector criteria, it seem that all those matching messages are "held" and other consumers that wants to select the same matching selector criteria cannot get to those messages to work on. So what happens is one consumer gets all the messages (jobs) while all the other consumers sit idle.

      I would like for only one message to be "marked" received at a time.. the other messages that match the selector but is not given to the consumer yet should be available for the receive() call of the other consumers.

      Is this something posssible or am I just doing something that is not supported?? Is there a better way to achieve what i'm trying to do?

      Any help and suggestion would be appreciated.

      I'm using JBoss 4.2 + JBoss Messaging 1.2

      Thanks,
      Minh

        • 1. Re: Selector Help needed - Possible alternative suggestions
          timfox

          I am trying to understand the consumers you are using, you have multiple consumers with the *same* selector?

          Do you also have other consumers with different selectors?

          Please explain in more detail.

          • 2. Re: Selector Help needed - Possible alternative suggestions

            Hi Tim,

            Thanks for looking at my question.

            Yes, we have multiple consumers that might have the same selectors and or different selectors.

            The idea of using the producer and consumer model for us is this.

            There will be many requests that will be made concurrently. These requests all go into a persisent queue for guaranteed delivery. These requests are for a "job" to be worked on. These jobs might be quick or long running jobs.

            What we want is to have a variable number of consumers. Each consumer will sit on a different box, since each job is very CPU intensive. Each consumer will look for certain types of jobs and process them one-at-a-time.

            We want to have multiple consumers to have the *same* selectors because we want to scale side-ways; just add more boxes to be able to process more messages (jobs).

            We also want to have multiple consumers to have different selectors, since certain types of jobs are known to be much smaller than the other jobs. So we want to have dedicated consumers to work on these types of jobs. This is so that the long running jobs do not clog up all the consumers and queue up all the small jobs. In our use case, it's ok to have the long running jobs take their time to do the work.. but it's mandatory to get all the small jobs through as soon as possible.

            The other caveat is that certain consumers should be able to config to look for small jobs first, then if there are no small jobs in the queue, grab a big job. When do, start looking for a small job again... and so on..

            So, we have three types of consumers if you may. One dedicated to handling small jobs. One dedicated to handling big jobs. One looks for small jobs, if non are there, look for big jobs to work on.

            We actually do have requirements to have multiple consumers of big jobs to work on same and different types of of jobs..

            We have all of these implemented and working with one problem that we didn't expect. That is, the consumer that is applying a selector, seems to be holding hostage all the messages that fit it's selector even though he's only called receive() for one message.

            What we need is for other consumers to be able to apply the same selector and be able to get the messages that the other consumer didn't call receive() on yet. That way, we can have many consumers work on many of the same types of jobs. We need this to scale the processing of these jobs.

            My other thought is if there is a way to specify for the consumer applying the selector that he only wants 1 message to be returned that matches his criteria. He will work on his job and then go ask for 1 more of the same.

            Thanks,
            Minh

            • 3. Re: Selector Help needed - Possible alternative suggestions
              timfox

              In general it's a bad idea to use selectors with queues since it requires the entire queue to be scanned every time delivery is attempted (slow).

              In most cases this can be refactored into using a topic with durable subscriptions - one durable subscription for each of your consumer types.

              The selector on the durable sub is evaluated *before* the messages reach the durable sub, so it is much more efficient. (Think of each durable subscription of a queue, but it only contains those messages which match the selector).

              Durable subscriptions can be clustered in JBM so you can have more than one consumer on the same durable subscription on different nodes of the cluster.

              (There is an issue with message redistribution in 1.3.0 which reduces performance for this, but this will be fixed in 1.4.0.)

              The issue about a message consumer getting "all" the messages is because each message consumer buffers ("prefetches") into its local buffer. This is done for performance reasons. For high throughput it is much more efficient not having to go the server every time you want a message.

              If this is a problem for you, you can reduce the prefetch size - this is explained in the userguide.

              • 4. Re: Selector Help needed - Possible alternative suggestions

                Hi Tim,

                You are right about the consumers prefetching messages from the queue. It had nothing to do with selectors. I had no selectors and still experienced the same thing.

                I made a sample producer that allows me to put some messages in the queue.. I then had a consumer that reads from the queue.. then wait for 30 seconds.. I then had another instance of the same consumer run.. it seems that by the time it ran.. the first consumer had already prefetched all the messages off the queue.. like you said..

                So, I tried setting the PrefetchSize to 1. But it did not work... I experienced the same issue. Here is my mbean definition in my connection-factory-service.xml

                <mbean code="org.jboss.jms.server.connectionfactory.ConnectionFactory"
                name="jboss.messaging.connectionfactory:service=ConnectionFactory"
                xmbean-dd="xmdesc/ConnectionFactory-xmbean.xml">
                <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer
                <depends optional-attribute-name="Connector">jboss.messaging:service=Connector,transport=bisocket
                jboss.messaging:service=PostOffice



                /ConnectionFactory
                /XAConnectionFactory
                java:/ConnectionFactory
                java:/XAConnectionFactory



                1



                Our use case is that we will not have many many messages.. but what we will have are long running jobs that are a result of the messages being picked up by the consumers.. Our plan is that by the time we feel that there are too many messages in the queue... we will just add more consumers to minimize the messages in the queue...

                Could you please let me know if I am using the PrefetchSize attribute properly?

                Thanks,

                • 5. Re: Selector Help needed - Possible alternative suggestions
                  timfox

                  Please post again using code tags (see the code button?) otherwise we cannot read it.

                  • 6. Re: Selector Help needed - Possible alternative suggestions

                    Sorry, here's my connection-factory-service.xml

                    <?xml version="1.0" encoding="UTF-8"?>
                    <server>
                     <mbean code="org.jboss.jms.server.connectionfactory.ConnectionFactory"
                     name="jboss.messaging.connectionfactory:service=ConnectionFactory"
                     xmbean-dd="xmdesc/ConnectionFactory-xmbean.xml">
                     <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
                     <depends optional-attribute-name="Connector">jboss.messaging:service=Connector,transport=bisocket</depends>
                     <depends>jboss.messaging:service=PostOffice</depends>
                    
                     <attribute name="JNDIBindings">
                     <bindings>
                     <binding>/ConnectionFactory</binding>
                     <binding>/XAConnectionFactory</binding>
                     <binding>java:/ConnectionFactory</binding>
                     <binding>java:/XAConnectionFactory</binding>
                     </bindings>
                     </attribute>
                    
                     <attribute name="PrefetchSize">1</attribute>
                     </mbean>
                    </server>
                    


                    • 7. Re: Selector Help needed - Possible alternative suggestions

                      BTW here is the consumer that I made. It's a modification of the one that came with the example code:


                      public class JMSConsumer {
                      
                       public static void main(String[] args) {
                       String jndiDestinationName = "/queue/testQueue";
                       InitialContext ic = null;
                       Connection connection = null;
                       boolean deployed = false;
                      
                       try {
                       if (!Util.doesDestinationExist(jndiDestinationName)) {
                       System.out.println("Destination " + jndiDestinationName + " does not exist, deploying it");
                       Util.deployQueue(jndiDestinationName);
                       deployed = true;
                       }
                      
                       ic = new InitialContext();
                       ic.addToEnvironment("java.naming.factory.initial", "org.jnp.interfaces.NamingContextFactory");
                       ic.addToEnvironment("java.naming.provider.url", "jnp://10.5.1.241:1099");
                       ic.addToEnvironment("java.naming.factory.url.pkgs", "org.jboss.naming:org.jnp.interfaces");
                      
                       ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
                       Queue queue = (Queue) ic.lookup(jndiDestinationName);
                      
                       connection = cf.createConnection();
                       Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                      
                       MessageConsumer consumer = session.createConsumer(queue);
                      
                       connection.start();
                      
                       System.out.println("Waiting for messages");
                       System.out.println("--------------------");
                       while (true) {
                       TextMessage message = (TextMessage) consumer.receive(2000);
                       if (message != null) {
                       System.out.println("message=" + message.getText());
                       System.out.println("-------------------------------");
                      
                       // Acknowledge that we've recieved and processed the message successfully
                       message.acknowledge();
                       Thread.sleep(15000);
                       }
                       }
                       }
                       catch (Exception e) {
                       e.printStackTrace();
                       }
                       finally {
                       try {
                       if (deployed) {
                       Util.undeployQueue(jndiDestinationName);
                       }
                       }
                       catch (Exception e) {
                       e.printStackTrace();
                       }
                      
                       if (ic != null) {
                       try {
                       ic.close();
                       }
                       catch (Exception e) {
                       e.printStackTrace();
                       }
                       }
                      
                       // ALWAYS close your connection in a finally block to avoid leaks
                       // Closing connection also takes care of closing its related objects
                       // e.g. sessions
                       try {
                       if (connection != null) {
                       connection.close();
                       }
                      
                       }
                       catch (JMSException jmse) {
                       System.err.println("Could not close connection " + connection + " exception was " + jmse);
                       jmse.printStackTrace();
                       }
                       }
                       }
                      
                      }
                      



                      It seems that on the call to consumer.receive() it gets a batch of messages no matter if I set the PrefetchSize attribute to 1 or not..

                      • 8. Re: Selector Help needed - Possible alternative suggestions
                        timfox

                        preFetch certainly seems to work for us.

                        Can you post a full test case that demonstrates prefetch not working and will investigate further?

                        Thanks.