receiveNoWait doesn't work if parent is a daemon
mainbrain Jul 26, 2011 11:21 AMHello
i got weird time outs and IllegalThreadStateExceptions when using another open source project in conjunction with HornetQ. A lengthy debugging session revealed an interresting problem.
If the parent ThreadGroup is a daemon thread then MessageConsumer.receiveNoWait() doesn't work anymore when there are longer periods of time between calls to the receive method. Precondition is that System.getSecurityManager() == null.
The first call to receiveNoWait results in HornetQThreadFactory class to create a ThreadGroup which inherits the daemon boolean from its parent. Then when HornetQThreadFactory processed the command/Runnable the ThreadGroup is destroy() 'ed after a while. Unfortunately subsequent calls to receiveNoWait use the same HornetqThreadFactory instance, which uses the destroy() 'ed ThreadGroup, which in turn causes an IllegalThreadStateException because it's still trying to use the destroyed ThreadGroup.
I managed to reproduce the problem in a unit test. Just make sure there is a queue named "queue/test" and no security manager.
@Test public void communicationShouldBePossibleAfterLongerPeriodsOfSilence() throws Exception { // setting the flag directly causes the same problem as creating a new ThreadGroup which is a daemon Thread.currentThread().getThreadGroup().setDaemon(true); final long DAEMON_THREAD_DESTROY_TIMEOUT = (long) (1000.0 * 60.0 * 1.3); final String QUEUE_NAME = "queue/test"; final String TEST_DATA = "test"; // empty queue while (getJMSMessage(JBOSS_HOST, JBOSS_PORT, QUEUE_NAME) != null) { } putJmsMessage(JBOSS_HOST, JBOSS_PORT, QUEUE_NAME, TEST_DATA); putJmsMessage(JBOSS_HOST, JBOSS_PORT, QUEUE_NAME, TEST_DATA); Thread.sleep(1000); String msg = getJMSMessage(JBOSS_HOST, JBOSS_PORT, QUEUE_NAME); assertEquals(TEST_DATA, msg); Thread.sleep(DAEMON_THREAD_DESTROY_TIMEOUT); msg = getJMSMessage(JBOSS_HOST, JBOSS_PORT, QUEUE_NAME); assertEquals(TEST_DATA, msg); } static void putJmsMessage(String host, String port, String queue, String msg) throws Exception { MessageProducer producer = null; Connection conn = null; Session session = null; try { session = null; conn = null; Hashtable<String, String> env = null; InitialContext iniCtx = null; ConnectionFactory connFactory = null; env = new Hashtable<String, String>(); env.put(Context.PROVIDER_URL, host + ":" + port); env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); // env.put("jnp.socket.Factory", "org.jnp.interfaces.TimedSocketFactory"); try { iniCtx = new InitialContext(env); } catch (NamingException e1) { throw new Exception("Bad initial context. Maybe host and port are wrong.", e1); } Destination msgDestination; // ................................lookup............................. try { Object o = iniCtx.lookup("ConnectionFactory"); connFactory = (ConnectionFactory) o; msgDestination = (Destination) iniCtx.lookup(queue); } catch (Exception e) { throw new Exception( "Error while performing lookup. Either JMS server is offline or lookup string(queue/topic) is not registered or misspelled.", e); } // ................................connect............................. if (conn == null) { conn = connFactory.createConnection(); } if (session == null) { session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); } session.run(); conn.start(); producer = session.createProducer(msgDestination); ObjectMessage jmsMessage = session.createObjectMessage(); jmsMessage.setObject(msg); producer.send(jmsMessage); } finally { if (producer != null) producer.close(); if (session != null) session.close(); if (conn != null) conn.close(); } } static String getJMSMessage(String host, String port, String queue) throws Exception { String msg = null; MessageConsumer receiver = null; Connection conn = null; Session session = null; try { session = null; conn = null; Hashtable<String, String> env = null; InitialContext iniCtx = null; ConnectionFactory connFactory = null; env = new Hashtable<String, String>(); env.put(Context.PROVIDER_URL, host + ":" + port); env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); // env.put("jnp.socket.Factory", "org.jnp.interfaces.TimedSocketFactory"); try { iniCtx = new InitialContext(env); } catch (NamingException e1) { throw new Exception("Bad initial context. Maybe host and port are wrong.", e1); } Destination msgDestination; // ................................lookup............................. try { Object o = iniCtx.lookup("ConnectionFactory"); connFactory = (ConnectionFactory) o; msgDestination = (Destination) iniCtx.lookup(queue); } catch (Exception e) { throw new Exception( "Error while performing lookup. Either JMS server is offline or lookup string(queue/topic) is not registered or misspelled.", e); } // ................................connect............................. conn = connFactory.createConnection(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); session.run(); conn.start(); receiver = session.createConsumer(msgDestination); ObjectMessage oMsg = (ObjectMessage) receiver.receiveNoWait(); if (oMsg == null) return null; msg = (String) oMsg.getObject(); connFactory = null; } finally { if (receiver != null) { receiver.close(); receiver = null; } if (session != null) { session.close(); session = null; } if (conn != null) { conn.close(); conn = null; } } return msg; }
Regards,
Yelve