System resources with cached consumers.
unsavory Jun 30, 2010 3:18 PMI am using HornetQ and caching a large number (200-300) of consumers on a topic that uses a local netty nio connection factory.
Every time a web request comes in (potentially hundreds per second), we retrieve the cached consumer for that client and consume all the messages in the topic for the client.
In the beginning the server would only stay up for about an hour before running into a race condition, which was later diagnosed as not having enough open file handles for the linux user. The default was set to 1024 which was being exhausted within the first hour.
But the question is, why is hornetq opening so many files and seemingly not releasing them? The number of open files continues to grow and grow with no end in sight while using this queue. Could someone please confirm that I am using these consumers properly?
Code Below:
public abstract class AbstractConsumer { private final Logger log = Logger.getLogger(AbstractConsumer.class); protected Destination destination; protected String instanceId = java.util.UUID.randomUUID().toString(); protected Connection connection; protected Session session; protected MessageConsumer consumer; protected boolean autoAcknowledge; public AbstractConsumer() {} public AbstractConsumer(ConnectionFactory connectionFactory, Destination destination, String selector) { this.destination = destination; try { log.info("MESSAGECONSUMER initializing: " + this); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = selector == null ? session.createConsumer(destination) : session.createConsumer(destination, selector); log.info("MESSAGECONSUMER created: " + this); } catch (JMSException e) { log.fatal("Fatal exception caught attempting to initialize MessageProducer: " + this, e); } } public AbstractConsumer(ConnectionFactory connectionFactory, Destination destination) { this(connectionFactory, destination, null); } @Override public String toString() { return new ToStringCreator(this) .append("Destination", destination) .append("Instance", instanceId) .toString(); } public synchronized Message consume(Long waitPeriodMillis) { log.debug("MESSAGECONSUMER consume started: " + this); Message message = null; try { connection.start(); message = waitPeriodMillis == null ? consumer.receiveNoWait() : consumer.receive(waitPeriodMillis); if (message == null) { log.debug("MESSAGECONSUMER no message to consume: " + this); } else { log.debug("MESSAGECONSUMER found a message to consume: " + this); try { message = onMessage(message); } catch (Exception e) { log.error("Error caught in AbstractMessageConsumer attempting to process message: " + this, e); } } connection.stop(); } catch (JMSException e) { log.fatal("Exception caught in MessageConsumer while attempting to consume: " + this, e); } log.debug("MESSAGECONSUMER consume finished: " + this); return message; } public synchronized Message consume() { return consume(null); } public synchronized List<Message> consumeAll(Long waitPeriodMillis) { log.debug("MESSAGECONSUMER consume all started: " + this); List<Message> messages = new ArrayList<Message>(); try { connection.start(); while(true) { Message message = waitPeriodMillis == null ? consumer.receiveNoWait() : consumer.receive(waitPeriodMillis); if (message == null) { log.debug("MESSAGECONSUMER no more messages to consume: " + this); break; } log.debug("MESSAGECONSUMER found a message to consume: " + this); try { message = onMessage(message); } catch (Exception e) { log.error("Error caught in AbstractMessageConsumer attempting to process message: " + this, e); } messages.add(message); } connection.stop(); } catch (JMSException e) { log.fatal("Fatal Exception caught in MessageConsumer while attempting to consume: " + this, e); } log.debug("MESSAGECONSUMER consume all finished: " + this); return messages; } public synchronized List<Message> consumeAll() { return consumeAll(null); } protected abstract Message onMessage(Message message); @PreDestroy public synchronized void destroy() { log.info("Destroy called on MessageConsumer: " + this); try { if (connection != null) { log.info("Closing ClientConnection: " + this); connection.close(); log.info("ClientConnection closed: " + this); } } catch (JMSException e) { log.warn("Exception caught attempting to shut down MessageConsumer: " + this, e); } log.info("MessageConsumer destroyed: " + this); } }