6 Replies Latest reply on Jun 22, 2010 8:13 AM by PÃ¥l Evensen

    How does the queue browser work?

    PÃ¥l Evensen Novice

      Hi, could someone please enlighten me on how a queue browser works? I'm referring to the method described in the javadoc as ClientSession.createConsumer(String queueName, String filter, boolean browseOnly).

       

      What I want to achieve is to have a queue with multiple consumers (using the native Java API), where consumers can receive messages that was created both prior to their subscription and at the same time receive messages that enters the queue later. The method mentioned above is said to create something that's called a queue browser, receiving the messages, but not consuming them. I thought this could be the solution to my task, but it appears that it only processes messages that are already in the queue, and not receiving new ones, after it has been created.

       

      To my understanding, I cannot use a topic for this task, as messages created before the consumer subscribes to the queue will not be recieved.

       

      Here's my code:

       

            //Creates queue browser, not consumer

            //FIXME: This does not pick up new messages from the queue.

            ClientConsumer messageConsumer = session.createConsumer(queueName, coupon.getFilterString(), true);

       

            for(;;) {

              try {

                // Step 8. Receive the message.

                ClientMessage messageReceived = messageConsumer.receive();

                System.out.println("Funnet nytt mål til tippekupong");

                String matchId   = String.valueOf(messageReceived.getIntProperty("matchId"));

                int homeGoals = messageReceived.getIntProperty("homeGoals");

                int awayGoals = messageReceived.getIntProperty("awayGoals");

                ScoreEvent se = new ScoreEvent(matchId, homeGoals, awayGoals);

                coupon.processEvent(se);

              }

              catch(Exception e) {

                e.printStackTrace();

              }

            }

       

      ###

       

      Swapping out session.createConsumer(queueName, coupon.getFilterString(), true); with ClientConsumer messageConsumer = session.createConsumer(queueName, coupon.getFilterString()); helps with the problem of the subscriber not picking up new messages, but turns the queue into a point-to-point queue, preventing other consumers to process the same messages.

       

      Is there any way around this?

       

      Regards,

      Pål

        • 1. Re: How does the queue browser work?
          Clebert Suconic Master

          QueueBrowser works as it's defined...it will look at the queue, without consuming the message.

           

          But how you're consuming the messages?  Are you just leaving them growing on the queue?

           

           

          A common anti-pattern with messaging systems is using the MoM as a Database, what would lead to several problems. (I'm talking about any messaging system in the world.. not just HornetQ).

           

           

          If you don't consume the messages, messages will go towards paging and you won't be able to see messages on the page-system until you consume and ack messages from the queue.

          1 of 1 people found this helpful
          • 2. Re: How does the queue browser work?
            PÃ¥l Evensen Novice

            For this particular case, I was thinking of clearing the queue once a week, and my question was basically "is there a way to have a pub/sub interaction model where multiple subscribers also receive events that happened prior to the subscription as well as new events?", but I understand that this is a limitation of the interaction model itself and not HornetQ.

             

            I was trying to solve this using the queue browser, but since it appears not to pick up new messages on the queue, I will try to solve it another way. Thank you for your valuable input.

            • 3. Re: How does the queue browser work?
              Tim Fox Master

              The browser should see messages added since it was created - what version are you using?

               

              If you can post a self-contained simple test case that demonstrates the issue, someone can take a look.

              • 4. Re: How does the queue browser work?
                PÃ¥l Evensen Novice

                I'm using version 2.1.0 Final, but am running it with Java v 1.7ea84, since I'm also using NIO2 functionality that's not available in v1.6. I don't know if this can generate unexpected errors in HornetQ.

                 

                I'll try to set up a simple test case that demonstrates the issue, but it might prove difficult as the application as a whole is rather complex.

                 

                Thanks anyway.

                 

                Pål

                • 5. Re: How does the queue browser work?
                  Tim Fox Master

                  PÃ¥l Evensen wrote:

                   

                  I'm using version 2.1.0 Final, but am running it with Java v 1.7ea84, since I'm also using NIO2 functionality that's not available in v1.6. I don't know if this can generate unexpected errors in HornetQ.

                   

                  I'll try to set up a simple test case that demonstrates the issue, but it might prove difficult as the application as a whole is rather complex.

                   

                  Thanks anyway.

                   

                  Pål

                  I don't see why it would be hard to replicate. If queue browsers really don't pick up messages added since creation this should be trivial to replicate.

                   

                  Just create a browser, then send some messages to the queue, and see if they're picked up.

                  • 6. Re: How does the queue browser work?
                    PÃ¥l Evensen Novice

                    Allright, I've created a test case that replicates the problem:

                     

                    import core.hornetq.HornetQStompServer;
                    import org.hornetq.api.core.SimpleString;
                    import org.hornetq.api.core.client.ClientMessage;

                     

                    /**
                    * @author: Pål Evensen
                    * Date: Jun 22, 2010
                    */
                    public class HornetQTest {

                     

                      public static void main(String args[]) {
                        try {
                          HornetQStompServer server = new HornetQStompServer();
                          server.createQueue("soccer.events.goals", false);
                          ClientMessage clientMessage = server.getClientSession().createMessage(true);
                          clientMessage.putIntProperty("matchId", 1);     
                          server.sendSimpleMessage("soccer.events.goals", clientMessage);

                     

                          //Create queue browser, subscribing to the queue
                          QueueSubscriber queueSubscriber = new QueueSubscriber(server);
                          Thread t1 = new Thread(queueSubscriber);
                          t1.start();

                     

                          //Create queue browser, subscribing to the queue
                          QueueSubscriber queueSubscriber2 = new QueueSubscriber(server);
                          Thread t2 = new Thread(queueSubscriber2);
                          t2.start();    

                     

                          ClientMessage clientMessage2 = server.getClientSession().createMessage(true);
                          clientMessage2.putIntProperty("matchId", 2);
                          Thread.currentThread().sleep(1000);
                          System.out.println("woke up");
                          server.sendSimpleMessage("soccer.events.goals", clientMessage2);
                        } catch (Exception e) {
                          e.printStackTrace();
                        }

                      }

                    }

                     

                    #########################################

                     

                    import org.hornetq.api.core.HornetQException;
                    import org.hornetq.api.core.client.ClientConsumer;
                    import org.hornetq.api.core.client.ClientMessage;
                    import org.hornetq.api.core.client.ClientSession;

                     

                    import javax.xml.parsers.ParserConfigurationException;
                    import javax.xml.transform.TransformerException;

                     

                    /**
                    * @author: Pål Evensen
                    * Date: Jun 22, 2010
                    */
                    public class QueueSubscriber implements Runnable {

                     

                      private final String queueName; //Queue to be consumed
                      private final core.hornetq.HornetQStompServer stompServer;

                     


                      public QueueSubscriber(core.hornetq.HornetQStompServer stompServer)
                          throws TransformerException, ParserConfigurationException {
                        queueName = "soccer.events.goals";
                        this.stompServer = stompServer;

                      }

                     

                      public void run() {

                     

                        ClientSession session = stompServer.getClientSession();
                        try {
                          //ClientConsumer messageConsumer = session.createConsumer(queueName, "matchId=1 OR matchId=2 OR matchId=3 OR matchId=4");
                          //Creates queue browser, not consumer
                          //FIXME: This does not pick up new messages from the queue.
                          ClientConsumer messageConsumer = session.createConsumer(queueName, "matchId=1 OR matchId=2 OR matchId=3 OR matchId=4", true);

                     

                          for(;;) {
                            try {
                              // Step 8. Receive the message.
                              ClientMessage messageReceived = messageConsumer.receive();
                              System.out.println(Thread.currentThread().getName()+": found event. MatchId: " + messageReceived.getIntProperty("matchId"));
                            }
                            catch(Exception e) {
                              e.printStackTrace();
                            }
                          }
                        }
                        catch (Exception e) {
                          e.printStackTrace();
                        }
                        finally {
                            // Step 9. Be sure to close our resources!

                     

                            if (session != null)
                            {
                              try {
                                session.close();
                              } catch (HornetQException e) {
                                e.printStackTrace();
                              }
                            }
                        }
                      }

                    }

                     

                    #####################

                     

                    package core.hornetq;

                     

                    import org.hornetq.api.core.HornetQException;
                    import org.hornetq.api.core.SimpleString;
                    import org.hornetq.api.core.TransportConfiguration;
                    import org.hornetq.api.core.client.*;
                    import org.hornetq.core.config.impl.FileConfiguration;
                    import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
                    import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
                    import org.hornetq.core.server.HornetQServer;
                    import org.hornetq.core.server.HornetQServers;

                     

                    import java.util.HashMap;
                    import java.util.HashSet;
                    import java.util.Map;
                    import java.util.Set;
                    /**
                    * @author: Pål Evensen
                    * Date: Apr 5, 2010

                    */

                     


                    /**
                    * Create HornetQ Server with a Stomp Connector.
                    */
                    public class HornetQStompServer {

                     

                      private HornetQServer hornetqServer;
                      private ClientSession session;
                      private Set<ClientProducer> producers;

                     

                        public HornetQStompServer() throws Exception {

                     

                        FileConfiguration configuration = new FileConfiguration();
                        //configuration.setConfigurationUrl("file:///home/paale/projects/eventmw/config/hornetq-configuration.xml");
                        String path = System.getProperty("user.dir");
                        configuration.setConfigurationUrl("file:///"+ path +"/config/hornetq-configuration.xml");
                        configuration.setSecurityEnabled(false);
                            configuration.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));   
                        configuration.start(); 

                     

                            // we create the HornetQ core using this config
                            hornetqServer = HornetQServers
                                    .newHornetQServer(configuration);
                        hornetqServer.start();
                        createClientSession();
                        session.start();
                        producers = new HashSet<ClientProducer>();
                        }

                     

                      /**
                       * Creates and sets up a client session to be used by other classes
                       * @throws Exception
                       */
                      private void createClientSession() throws Exception {
                        ClientSessionFactory nettyFactory = HornetQClient.createClientSessionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
                        //ClientSessionFactory nettyFactory = HornetQClient.createClientSessionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
                        //session = nettyFactory.createSession("paale", "moordi", false, true, true, true, 64);
                        session = nettyFactory.createSession();

                      }

                     

                      public void sendSimpleMessage(String address, ClientMessage message) throws HornetQException {
                        createQueue(address, false);
                        getClientProducer(address).send(message);

                      }

                     

                      private ClientProducer getClientProducer(String address) throws HornetQException {
                        ClientProducer producer = null;

                     

                        for(ClientProducer p : producers) {
                          if(address.equalsIgnoreCase(p.getAddress().toString())) {
                            producer = p;
                            break;
                          }

                        }

                     

                        if(producer == null) {
                          producer = session.createProducer(address);
                          producers.add(producer);
                        }

                     

                        return producer;
                      }

                     

                      public ClientSession getClientSession() {
                        return session;

                      }

                     

                      /**
                       * Creates a new queue if the specified queue doesn't exist
                        * @param queueName
                       */
                      public void createQueue(String queueName, boolean persistent) throws HornetQException{
                        ClientSession.QueueQuery result = session.queueQuery(new SimpleString(queueName));  

                     

                        if(!result.isExists())
                          session.createQueue(queueName, queueName, persistent);
                      }
                    }

                     

                    <?xml version="1.0" encoding="UTF-8"?>
                    <configuration xmlns="urn:hornetq"
                                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                                   xsi:schemaLocation="urn:hornetq/schema/hornetq-configuration.xsd">

                     

                       <connectors>
                          <connector name="netty">
                             <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
                             <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                             <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
                          </connector>
                       </connectors>

                     

                       <acceptors>
                          <acceptor name="netty">
                             <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
                             <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                             <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
                          </acceptor>
                        <acceptor name="stomp-acceptor">
                            <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
                            <param key="protocol"  value="stomp"/>
                            <param key="host"  value="localhost"/>
                            <param key="port"  value="61613"/>
                        </acceptor>
                       </acceptors>

                     

                       <security-settings>
                          <security-setting match="#">
                         <permission type="createDurableQueue" roles="admin"/>
                         <permission type="deleteDurableQueue" roles="admin"/>
                             <permission type="createTempQueue" roles="admin, generic-client"/>
                             <permission type="deleteTempQueue" roles="admin"/>
                             <permission type="createNonDurableQueue" roles="admin, generic-client"/>
                             <permission type="consume" roles="admin, generic-client"/>
                             <permission type="send" roles="admin"/>
                          </security-setting>
                        <security-setting match="jms.topic.soccer.#">
                            <permission type="createDurableQueue" roles="admin, generic-client"/>
                            <permission type="deleteDurableQueue" roles="admin"/>
                            <permission type="createNonDurableQueue" roles="admin, generic-client, guest"/>
                            <permission type="deleteNonDurableQueue" roles="admin"/>
                            <permission type="createTempQueue" roles="admin, generic-client, guest"/>
                            <permission type="send" roles="admin"/>
                            <permission type="consume" roles="admin, generic-client, guest"/>       
                        </security-setting>
                       </security-settings>

                     

                       <address-settings>
                          <!--default for catch all-->
                          <address-setting match="#">
                             <dead-letter-address>jms.queue.DLQ</dead-letter-address>
                             <expiry-address>jms.queue.ExpiryQueue</expiry-address>
                             <redelivery-delay>0</redelivery-delay>
                             <max-size-bytes>-1</max-size-bytes>
                             <page-size-bytes>10485760</page-size-bytes>        
                             <message-counter-history-day-limit>10</message-counter-history-day-limit>
                          </address-setting>

                     

                       </address-settings>
                      
                    <queues>
                           <queue name="soccer.events.goals">
                               <address>soccer.events.goals</address>
                           <durable>false</durable>
                           </queue>
                    </queues>

                     

                       <paging-directory>data/paging</paging-directory>
                       <bindings-directory>data/bindings</bindings-directory>
                       <journal-directory>data/journal</journal-directory>
                       <large-messages-directory>data/large-messages</large-messages-directory>

                     

                    <!-- false to disable JMX management for HornetQ -->
                    <jmx-management-enabled>true</jmx-management-enabled>
                      
                    </configuration>

                     

                    Running this example as posted here, I get the following output:

                     

                    Thread-1: found event. MatchId: 1
                    Thread-2: found event. MatchId: 1
                    woke up

                     

                    Notice that the second event is never received.

                     

                    By using the message consumer method instead of the message browser (both in bold), I get the following output (as expected):

                     

                    Thread-1: found event. MatchId: 1
                    woke up
                    Thread-2: found event. MatchId: 2

                     

                    This might be a threading issue that I don't understand, because removing the sleep() (also in bold) statement enables both methods to work as expected.