ExceptionListener not called when JBoss goes down
natemc Mar 19, 2004 6:34 PMHello!
I have a process communicating with JBoss via JMS, and I would like it to recover gracefully when JBoss is rebooted. I thought that if I registered an ExceptionListener with my QueueConnection that it would be invoked when JBoss goes down, but I haven't observed that. I've enclosed an example program to demonstrate what I'm trying to do; its output is
Starting JBoss...JBoss is up
Sending message...enqueued...received
Shutting down JBoss...JBoss is down
Starting JBoss...JBoss is up
Sending message...enqueued...
[Then it hangs.] Any pointers as to where I'm going wrong are much appreciated.
Thanks!
Nate
import java.io.*; import java.lang.IllegalStateException; import java.util.*; import javax.jms.*; import javax.naming.*; public final class JBossRebootExperiment2 { public static void main(final String[] args) { Listener listener = null; try { Process jboss = startJBoss(); try { final Queue queue = (Queue) getJBossContext().lookup(QUEUE_NAME); final QueueConnection connection = createQueueConnection(); final QueueSession session = connection.createQueueSession(true, 0); listener = new Listener(session); try { session.createReceiver(queue).setMessageListener(listener); connection.setExceptionListener(listener); connection.start(); sendMessage(listener); try { shutdownJBoss(jboss); } finally { jboss = null; } try { Thread.sleep(5000); } catch (final InterruptedException e) { } jboss = startJBoss(); sendMessage(listener); } finally { safeClose(connection); System.out.println("Messages successfully transmitted: " + listener.messagesReceived); System.out.println("Exceptions received: " + listener.exceptionsReceived); } } finally { if (jboss != null) { shutdownJBoss(jboss); } } } catch (final Exception e) { e.printStackTrace(); } } private static final class Listener implements MessageListener, ExceptionListener { public Listener(final QueueSession session) { this.session = session; } public synchronized void onMessage(final Message message) { try { session.commit(); ++messagesReceived; notifyAll(); } catch (final JMSException e) { e.printStackTrace(); } } public synchronized void onException(final JMSException e) { ++exceptionsReceived; notifyAll(); } public volatile int messagesReceived; public volatile int exceptionsReceived; private final QueueSession session; } private static QueueConnection createQueueConnection() throws JMSException, NamingException { final QueueConnectionFactory qcf = (QueueConnectionFactory) getJBossContext().lookup(CONNECTION_FACTORY_NAME); return qcf.createQueueConnection(); } private static void sendMessage(final Listener listener) throws InterruptedException, JMSException, NamingException { System.out.print("Sending message..."); final int oldMessagesReceived = listener.messagesReceived; sendMsg(); System.out.print("enqueued..."); synchronized (listener) { while (oldMessagesReceived == listener.messagesReceived) listener.wait(); } System.out.println("received"); } private static void sendMsg() throws JMSException, NamingException { final Queue queue = (Queue) getJBossContext().lookup(QUEUE_NAME); final QueueConnection connection = createQueueConnection(); try { final QueueSession session = connection.createQueueSession(true, 0); final QueueSender sender = session.createSender(queue); sender.setDeliveryMode(DeliveryMode.PERSISTENT); sender.send(session.createTextMessage("foo")); session.commit(); } finally { safeClose(connection); } } private static void safeClose(final QueueConnection qc) { try { qc.close(); } catch (final JMSException e) { System.out.print("QueueConnection.close failed: "); e.printStackTrace(); } } private static void safeClose(final Reader reader) { try { reader.close(); } catch (final IOException e) { System.out.print("Reader.close failed: "); e.printStackTrace(); } } private static Context getJBossContext() throws NamingException { final Hashtable ht = new Hashtable(); ht.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); ht.put(Context.PROVIDER_URL, "jnp://localhost"); ht.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); return new InitialContext(ht); } private static Process startJBoss() throws IOException { System.out.print("Starting JBoss..."); final Process jboss = Runtime.getRuntime().exec(JBOSS_DIR + "/bin/run.sh"); final BufferedReader reader = new BufferedReader (new InputStreamReader(jboss.getInputStream())); try { String line = reader.readLine(); while (line.indexOf("JBoss") == -1 || line.indexOf("MicroKernel") == -1 || line.indexOf("Started") == -1) { line = reader.readLine(); } System.out.println("JBoss is up"); } finally { safeClose(reader); } return jboss; } private static void shutdownJBoss(final Process jboss) throws IOException { System.out.print("Shutting down JBoss..."); try { // Give JBoss 15 seconds to go peacefully Runtime.getRuntime().exec(JBOSS_DIR + "/bin/shutdown.sh -S"); final Interrupter interrupter = new Interrupter(Thread.currentThread(), 15000); jboss.waitFor(); interrupter.cancel(); } catch (final InterruptedException e) { killJBoss(); } System.out.println("JBoss is down"); } private static void killJBoss() throws IOException { for (int i = 0; i < 2; ++i) { final Process psef = Runtime.getRuntime().exec("ps -ef"); final BufferedReader reader = new BufferedReader (new InputStreamReader(psef.getInputStream())); try { String line; while ((line = reader.readLine()) != null) { if (line.indexOf("org.jboss.Main") >= 0) { Runtime.getRuntime().exec("kill -9 " + line.substring(9, 14)); } } } finally { safeClose(reader); } } } private static final class Interrupter implements Runnable { public Interrupter(final Thread thread2interrupt, final long millis2wait) { if (thread2interrupt == null) { throw new NullPointerException("thread2interrupt must be non-null"); } else if (millis2wait < 0) { throw new IllegalArgumentException("millis2wait must be >= 0"); } this.thread2interrupt = thread2interrupt; this.millis2wait = millis2wait; interrupterThread = new Thread(this); interrupterThread.start(); } public void cancel() { interrupterThread.interrupt(); } public void run() { if (Thread.currentThread() != interrupterThread) { throw new IllegalStateException("Wrong thread"); } try { Thread.sleep(millis2wait); thread2interrupt.interrupt(); } catch (final InterruptedException e) { } } private final Thread thread2interrupt; private final Thread interrupterThread; private final long millis2wait; } private static final String QUEUE_NAME = "queue/A"; private static final String CONNECTION_FACTORY_NAME = "ConnectionFactory"; private static final String JBOSS_DIR = "/opt/jboss"; }