4 Replies Latest reply on Jul 29, 2011 5:15 AM by viniciuscarvalho

    Connection design question

    viniciuscarvalho

      Hello there! I've kicked off a camel component for HornetQ, I would love to have a proper JMS camel endpoint without bringing the junk (spring-jms) along

       

      So, one of the things I'm considering is saving resources, and for that, caching connections instead of re-creating them every time. I'm planning to have a pool of Connections and reuse them for my consumers, but each consumer would have its own session. But a connection may have many sessions associated with it, I'm hoping that synchronization of the socket beats multiple opened sockets in this aspect.

       

      My main concern is when starting the connection:

       

      Let's say:

       

       

      Connection c = pool.getConnection(); //gets connection ref A
      Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
      MessageConsumer consumer = session.createConsumer(myDestination);
      consumer.setMessageListener(myListener)
      conn.start
      .
      .
      //another thread
      Connection c = pool.getConnection(); //gets connection ref A which is already started
      Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
      MessageConsumer consumer = session.createConsumer(myDestination);
      //Here is where I'm affraid. I read that if your connection is started and you create a consumer, you may loose some messages before you register
      your message listener
      

       

       

      So basically my question is: May I loose messages with this approach? Any comments on the design would be more than welcome, I've just started the project. Since this will be a HornetQ ONLY component, I don't mind using core client classes instead of the JMS wrappers if I gain something from it.

       

      Best regards

        • 1. Re: Connection design question
          clebert.suconic

          To answer that I would need to understand how you place it back to the pool.

           

           

          An issue you may have with caching consumers is that they may be buffering messages and not delivering. it will all depend on how you're doing.

           

          The best would be you having a consumer started and delivering to Camel. But I don't understand how that works.

          • 2. Re: Connection design question
            leosbitto

            Vinicius Carvalho wrote:

             

            Hello there! I've kicked off a camel component for HornetQ, I would love to have a proper JMS camel endpoint without bringing the junk (spring-jms) along

             

            So, one of the things I'm considering is saving resources, and for that, caching connections instead of re-creating them every time. I'm planning to have a pool of Connections and reuse them for my consumers, but each consumer would have its own session.

             

            I have used the default Camel JMS endpoints (based on org.springframework.jms.listener.DefaultMessageListenerContainer) successfully. With some effort it even works fine with the XA transactions (database updates, etc.). I would not call it the junk. Anyway, feel free to rewrite it yourself, but remember that the corner cases (JMS provider not available in various situations, etc.) are not easy. Additionally, I would suggest to cache not only the Connection, but also the Sessions, each with one MessageConsumer, because the MessageConsumers have a pool of messages attached to them and I think that creating them just for receiving one message would be very inefficient (i.e. you would get low throughput).

            • 3. Re: Connection design question
              viniciuscarvalho

              Hi Clebert, thanks for the reply. My cached connection is pretty simple, I decided not to use any external library. All I do is cache the connections on requests to create new sessions:

               

              public Session createSession(boolean transacted, int mode) throws JMSException {
                                  Session s = null;
                                  Connection c = null;
                                  try {
                                            synchronized (connectionCount) {
                                                      if (connectionCount.get() == 0) {
                                                                connections.put(newConnection());
                                                      }
                                            }
                                            c = connections.poll();
                                            if (c == null) {
                                                      synchronized (connectionCount) {
                                                                if (connectionCount.get() < POOL_SIZE) {
                                                                          c = newConnection();
              
              
                                                                } else {
                                                                          logger.trace("No more connections available, waiting 30 seconds for an available connection");
                                                                          c = connections.poll(30000, TimeUnit.MILLISECONDS);
                                                                }
                                                      }
                                            }else{
                                                      logger.trace("Borrowing connection[" + ((HornetQConnection) c).getUID() + "] from the pool");
                                            }
                                            s = c.createSession(transacted, mode);
              
              
                                  } catch (Exception e) {
                                            e.printStackTrace();
                                  } finally {
                                            if (c != null) {
                                                      logger.trace("returning connection[" + ((HornetQConnection) c).getUID() + "] to the connection pool");
                                                      try {
                                                                connections.put(c);
                                                      } catch (InterruptedException e) {
                                                                // TODO Auto-generated catch block
                                                                e.printStackTrace();
                                                      }
                                            }
                                  }
                
                                  return s;
                        }
              
              

               

              I've decided to use a queue, so the last used connection will always go to the end of the pool. One thing I'd like to improve is to keep track of how many sessions are opened per connection, and force creation of new connections when that reaches some threshold (dunno If I'm over reacting here).

               

              I've also decided not to cache consumers (not until I have a reason to at least), so in camel each endpoint creates a consumer or a producer, and whenever I have uri=hornet:queue:foo a new consumer for that queue would be created. I might be wrong, but I really don't see a very common scenario where you would have many consumers for a same endpoint. Well, again, I'm just starting, it may be a good improvement.

               

              Thanks for the help

              • 4. Re: Connection design question
                viniciuscarvalho

                Hi Leos, I'm "caching" consumers in the sense that I do not recreate them for every message. I want to cache the sessions as well, but will leave it for a version 0.2, as soon as I get the whole component working with only connection pooling.

                I love spring, but the whole spring-jms to me is a bad bad pattern. Producers are ok, but the consumers... Its killing our performance here, and thats why we decided to move to another approach. I know I'll be damned to try to work with transactions, but I'm trying to keep my mind relaxed now, until I get to to the point to actually synchronize XA and camel transactions

                 

                Thanks for your comments