1 2 Previous Next 15 Replies Latest reply on Jul 1, 2010 5:31 PM by unsavory

    System resources with cached consumers.

    unsavory

      I am using HornetQ and caching a large number (200-300) of consumers on a topic that uses a local netty nio connection factory.

       

      Every time a web request comes in (potentially hundreds per second), we retrieve the cached consumer for that client and consume all the messages in the topic for the client.

       

      In the beginning the server would only stay up for about an hour before running into a race condition, which was later diagnosed as not having enough open file handles for the linux user.  The default was set to 1024 which was being exhausted within the first hour.

       

      But the question is, why is hornetq opening so many files and seemingly not releasing them?  The number of open files continues to grow and grow with no end in sight while using this queue.  Could someone please confirm that I am using these consumers properly?

       

      Code Below:

       

      public abstract class AbstractConsumer {
       
      private final Logger log = Logger.getLogger(AbstractConsumer.class);
       
      protected Destination destination;
      protected String instanceId = java.util.UUID.randomUUID().toString();
       
      protected Connection connection;
      protected Session session;
      protected MessageConsumer consumer;
      protected boolean autoAcknowledge;
       
      public AbstractConsumer() {}
       
      public AbstractConsumer(ConnectionFactory connectionFactory, Destination destination, String selector) {
       
      this.destination = destination;
       
      try {
      log.info("MESSAGECONSUMER initializing: " + this);
      connection = connectionFactory.createConnection();
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      consumer = selector == null ? session.createConsumer(destination) : session.createConsumer(destination, selector);
      log.info("MESSAGECONSUMER created: " + this);
      } catch (JMSException e) {
      log.fatal("Fatal exception caught attempting to initialize MessageProducer: " + this, e);
      }
      }
       
      public AbstractConsumer(ConnectionFactory connectionFactory, Destination destination) {
      this(connectionFactory, destination, null);
      }
       
      @Override
      public String toString() {
      return new ToStringCreator(this)
      .append("Destination", destination)
      .append("Instance", instanceId)
      .toString();
      }
       
      public synchronized Message consume(Long waitPeriodMillis) {
      log.debug("MESSAGECONSUMER consume started: " + this);
       
      Message message = null;
       
      try {
      connection.start();
      message = waitPeriodMillis == null ? consumer.receiveNoWait() : 
      consumer.receive(waitPeriodMillis);
      if (message == null) {
      log.debug("MESSAGECONSUMER no message to consume: " + this);
      } else {
      log.debug("MESSAGECONSUMER found a message to consume: " + this);
      try {
      message = onMessage(message);
      } catch (Exception e) {
      log.error("Error caught in AbstractMessageConsumer attempting to process message: " + this, e);
      }
      }
      connection.stop();
      } catch (JMSException e) {
      log.fatal("Exception caught in MessageConsumer while attempting to consume: " + this, e);
      } 
       
      log.debug("MESSAGECONSUMER consume finished: " + this);
      return message;
      }
       
      public synchronized Message consume() {
      return consume(null);
      }
       
      public synchronized List<Message> consumeAll(Long waitPeriodMillis) {
       
      log.debug("MESSAGECONSUMER consume all started: " + this);
      List<Message> messages = new ArrayList<Message>();
       
      try {
      connection.start();
       
      while(true) {
       
      Message message = waitPeriodMillis == null ? consumer.receiveNoWait() : 
      consumer.receive(waitPeriodMillis);
       
      if (message == null) {
      log.debug("MESSAGECONSUMER no more messages to consume: " + this);
      break;
      }
       
      log.debug("MESSAGECONSUMER found a message to consume: " + this);
      try {
      message = onMessage(message);
      } catch (Exception e) {
      log.error("Error caught in AbstractMessageConsumer attempting to process message: " + this, e);
      }
      messages.add(message);
      }
       
      connection.stop();
      } catch (JMSException e) {
      log.fatal("Fatal Exception caught in MessageConsumer while attempting to consume: " + this, e);
      } 
       
      log.debug("MESSAGECONSUMER consume all finished: " + this);
      return messages;
      }
       
      public synchronized List<Message> consumeAll() {
      return consumeAll(null);
      }
       
      protected abstract Message onMessage(Message message);
       
      @PreDestroy
      public synchronized void destroy() {
      log.info("Destroy called on MessageConsumer: " + this);
      try {
      if (connection != null) {
      log.info("Closing ClientConnection: " + this);
      connection.close();
      log.info("ClientConnection closed: " + this);
      }
      } catch (JMSException e) {
      log.warn("Exception caught attempting to shut down MessageConsumer: " + this, e);
      }
      log.info("MessageConsumer destroyed: " + this);
      }
      }
      
      
      
        • 1. Re: System resources with cached consumers.
          clebert.suconic

          I'm not sure how you are using this class. How you are instantiating your factories?

           

          HornetQ should have just one Netty connection opened per ConnectionFactory.

           

          Also: Aren't you reinventing the wheel by controlling the caching yourself? A lot of what we do with Messaging servers is to maximize usage of resources. I would try something simple first and then optimize later.

           

          You should also probably use MessageListeners. You are blocking on readmessage, so you are blocking a thread on your HTTP Request. Maybe actually that's why you're having resourcing problems. You block inside a Servlet is going exactly against a non blocking approach with messaging.

          • 2. Re: System resources with cached consumers.
            unsavory

            Thanks Clebert, but several things.  I guess I should have been more clear.

             

            The ConnectionFactory is instantiated via hornetq using file configuration.  The relevant part of hornetq-configuration.xml:

             

             

               <connectors>
                  <connector name="netty">
                     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
                     <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                     <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
                     <param key="use-nio" value="true" />
                     <param key="tcp-send-buffer-size" value="1048576" />
                     <param key="tcp-receive-buffer-size" value="1048576" />
                  </connector>
                  
                  <connector name="netty-throughput">
                     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
                     <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                     <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
                     <param key="batch-delay" value="50"/>
                     <param key="use-nio" value="true" />
                     <param key="tcp-send-buffer-size" value="1048576" />
                     <param key="tcp-receive-buffer-size" value="1048576" />
                  </connector>
               </connectors>
             
               <acceptors>
                  <acceptor name="netty">
                     <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
                     <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                     <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
                     <param key="use-nio" value="true" />
                     <param key="tcp-send-buffer-size" value="1048576" />
                     <param key="tcp-receive-buffer-size" value="1048576" />
                  </acceptor>
                  
                  <acceptor name="netty-throughput">
                     <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
                     <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
                     <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
                     <param key="batch-delay" value="50"/>
                     <param key="direct-deliver" value="false"/>
                     <param key="use-nio" value="true" />
                     <param key="tcp-send-buffer-size" value="1048576" />
                     <param key="tcp-receive-buffer-size" value="1048576" />
                  </acceptor>
               </acceptors>
            
            
            

             

            The ConnectionFactory is then looked up via JNDI and injected into the consumers.

             

            In regards to caching, I am simply following the advice to re-use connections, sessions and consumers.  I am not running this inside a JEE container.  It is stand-alone vanilla Tomcat with Spring.  The only way I can re-use consumers without having to rebuild them with every web request, is if I cache them.

             

            Also, I am using receiveNoWait() while receiving messages so it should not be blocking.  It's unclear how I could even use MessageListeners inside a servlet unless I am missing something.

             

            Here is the prior thread which discusses what I'm trying to accomplish:  http://community.jboss.org/thread/153332?tstart=0

             

            I'm still wondering why HornetQ would be consuming system resources and not releasing them when using cached consumers with the code I posted.

            • 3. Re: System resources with cached consumers.
              clebert.suconic

              Sure.. it is defined through config files.. But are you looking up every time you create a connection? You would have an instance of the Factory for each consumer you have.

               

               

              I wouldn't use the MessageListener inside the servlet. I believe there's some way to make the response to the servlet asynchronously.

               

              I have talked to someone and I believe there's a new thing on Servlets for this.

               

               

              If you could do something like this:

               

              onMessage(message msg)

              {

                   AsyncHttpResponse response =  get the response somehow

                   response.sendResult(...result...);

              }

               

               

              AsyncHttpResponse is a fictitious API here. I believe there's a way to do it.

              • 4. Re: System resources with cached consumers.
                clebert.suconic

                "The ConnectionFactory is instantiated via hornetq using file configuration"

                 

                 

                Correction... it's *defined* using the file configuration, and we bind an instance to the JNDI.

                 

                You create a new instance each time you look up the JNDI.

                • 5. Re: System resources with cached consumers.
                  clebert.suconic

                  What I was talking to someone was about Asynchronous Servlets.. i guess that doesn't exist yet :-)

                  • 6. Re: System resources with cached consumers.
                    unsavory

                    Sorry, but I'm confused.

                     

                    If you bind a single factory instance to JNDI... and I am looking up that factory instance via JNDI, how could I be creating a new factory instance each time I do a JNDI lookup?

                     

                    And isn't this the way we are supposed to use it?  All the examples shown lookup a ConnectionFactory via JNDI and call connectionFactory.createConnection(), just as I do.

                    • 7. Re: System resources with cached consumers.
                      clebert.suconic
                      If you bind a single factory instance to JNDI... and I am looking up that factory instance via JNDI, how could I be creating a new factory instance each time I do a JNDI lookup?

                      That's how JNDI works.. it will deserialize the factory for you.

                       

                       

                      And isn't this the way we are supposed to use it?  All the examples shown lookup a ConnectionFactory via JNDI and call connectionFactory.createConnection(), just as I do.

                      Do the lookup only once. My understanding is you are creating several Connection.

                       

                       

                      You in fact only need a single connection and multiple sessions. (one session for each consumer).

                      • 8. Re: System resources with cached consumers.
                        unsavory

                        Wow.  So am I to understand that my entire application can run on a single connection instance?  Even if I have several difference queues, topics, producers and consumers all running at the same time from hundreds of concurrent threads?

                        • 9. Re: System resources with cached consumers.
                          clebert.suconic

                          Yes... and I'm not inventing anything here.. read the JMS API javadoc:

                           

                           

                          http://download-llnw.oracle.com/docs/cd/E17477_01/javaee/5/api/javax/jms/Connection.html

                           

                          I'm referring to this part:

                           

                          "Because the creation of a connection involves setting up authentication   and communication, a connection is a relatively heavyweight   object. Most clients will do all their messaging with a single connection.  Other more advanced applications may use several connections. The JMS API  does   not architect a reason for using multiple connections; however, there may   be operational reasons for doing so."

                           

                           

                           

                          You should have multiple sesisons though. Each Session should be related to a different thread. So.. each Consumer on its own Session.

                          • 10. Re: System resources with cached consumers.
                            clebert.suconic
                            ..at the same time from hundreds of concurrent threads...

                             

                            As long as you have each thread its own Session.

                             

                             

                            You may reuse sessions if you like...  as long as you only use the session in a single place at any one point.

                            1 of 1 people found this helpful
                            • 11. Re: System resources with cached consumers.
                              unsavory

                              Thanks Clebert!  This answers my question perfectly.  I've re-factored my consumer/producers to all share a common connection.  It's too early to tell if this actually solves the growing number of open files on our system yet.

                               

                              For me, the confusing part of the spec is:

                               

                              Most clients will do all their messaging with a single connection.

                               

                              To me, client would equal producer or consumer.  But I guess in the JMS world a client would equal a single jvm.

                              • 12. Re: System resources with cached consumers.
                                clebert.suconic

                                If you were using JmsXA (the resource adapter / JCA) it would be different. With JCA you can't open more than one session per Connection. JCA would do the cachings for you.

                                 

                                 

                                Also: An advice / Warning. since you're caching Consumers. Each consumer might have a read-ahead buffer. You could have the message arriving to a consumer that you're not handling. But if you have your application working I bet you tested it and discovered this already and have figured out the consumerwindowSize = 0 on the connectionFactory.

                                • 13. Re: System resources with cached consumers.
                                  unsavory

                                  Hi Clebert, thanks for the warning.  One question though.  Since I'm using a topic instead of a queue for my cached consumers, this is a non-issue right?  Wouldn't all messages be delivered to each consumer?  My consumers are using a selector to retrieve only the data they are interested in.

                                  • 14. Re: System resources with cached consumers.
                                    clebert.suconic

                                    The messages will be delivered to all the subscriptions (or queues). The message will only leave the memory when it was consumed from all the subscriptions.

                                     

                                    If each consumer has a selector, the message will be delivered only to the subscription where it matches. On that case it may not be an issue.

                                     

                                    So, be careful in not build up memory usage on the server.

                                    1 2 Previous Next