Infinite Thread generation in JBossMQ client?
geir1 Mar 25, 2004 7:39 AMIt seems like the thread pool that the JBoss MQ is using is generating an infinite number of threads. We are seeing this on processes running over an extended time period with connects and disconnects. We are experiencing that a single process will acquire up to 500 threads in 20 hours, before our OS is stopping the process and the application fails with an OutOfMemoryError. The application has about 30 "working" threads.
I have made some sample code to demonstrate the effect of reconnecting to a queue. This code never has more than 4 "working" threads.
Is there a way of configuring a maximum size of the JbossMQ thread pool? Is there any obvious ways to stop the thread pool growing? Is it possible to turn off the Thread pooling? The optimizing effect of the pool is removed by the fact that the app now runs with too many threads.
Geir H. Pettersen
Some observations:
1. The threadpool grows to about 34 if the sleep value is set to 2000; if it is set to 1000 it stabilizes on 65. If set lower it will grow more. My guess is that this differs between different systems.
2. All the "idle" threads are doing the same thing:
"Thread-5" daemon prio=5 tid=0x186bbf80 nid=0x8214 in Object.wait() [1936f000..1936fd88] at java.lang.Object.wait(Native Method) - waiting on <0x1003df90> (a EDU.oswego.cs.dl.util.concurrent.LinkedNode) at EDU.oswego.cs.dl.util.concurrent.SynchronousChannel.poll(SynchronousChannel.java:353) - locked <0x1003df90> (a EDU.oswego.cs.dl.util.concurrent.LinkedNode) at EDU.oswego.cs.dl.util.concurrent.PooledExecutor.getTask(PooledExecutor.java:707) at EDU.oswego.cs.dl.util.concurrent.PooledExecutor$Worker.run(PooledExecutor.java:731) at java.lang.Thread.run(Thread.java:534)
Example code:
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.naming.InitialContext; import javax.naming.NamingException; /** * Class for testing the possible infinite Thread generation in * JBOSS-JMS client. * * @author Geir H. Pettersen * @version $Revision$ */ public class JMSClientThreads { private QueueConnectionFactory connectionFactory; private Queue queue; private int threadCount; //----------------------------------------------------------------------------- public JMSClientThreads() throws NamingException, JMSException { init(); } //----------------------------------------------------------------------------- public void connect() throws JMSException { QueueConnection qConn; qConn = connectionFactory.createQueueConnection(); QueueSession qSession = qConn.createQueueSession(false,QueueSession.CLIENT_ACKNOWLEDGE); QueueReceiver qReceiver = qSession.createReceiver(queue); qReceiver.setMessageListener(new MessageListener() { public void onMessage(Message message) { System.out.println("Recieved message"); try { message.acknowledge(); } catch (JMSException e) { System.out.println("Acknowledge failed"); } } }); qConn.start(); try { Thread.sleep(100); } catch (InterruptedException e) { } qConn.close(); qConn = null; } //----------------------------------------------------------------------------- public void init() throws NamingException, JMSException { System.setProperty("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory"); System.setProperty("java.naming.provider.url","localhost:1199"); System.setProperty("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces"); InitialContext ic = new InitialContext(); connectionFactory = (QueueConnectionFactory) ic.lookup("UILConnectionFactory"); queue = (Queue) ic.lookup("queue/toExtQ"); ic.close(); } //----------------------------------------------------------------------------- private void debugThreads() { ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); StringBuffer threadNames = new StringBuffer(128); int threadCount = threadGroup.activeCount(); System.out.println("---"); Thread[] threads = new Thread[threadCount]; threadCount = threadGroup.enumerate(threads, true); System.out.println("Active threadcount: " + threadCount); for (int j = 0; j < threadCount; j++) { String threadName = threads[j].getName(); threadNames.append(threadName); if (j+1 < threadCount) { threadNames.append(','); } } System.out.println("Active threads: " + threadNames); System.out.println("---"); this.threadCount = threadCount; } //----------------------------------------------------------------------------- public void run() { while (true) { debugThreads(); try { connect(); } catch (JMSException e) { e.printStackTrace(); } } } //----------------------------------------------------------------------------- public static void main(String[] args) throws NamingException, JMSException { JMSClientThreads t = new JMSClientThreads(); t.run(); } }