ExceptionListener not called when JBoss goes down
natemc Mar 19, 2004 6:34 PMHello!
I have a process communicating with JBoss via JMS, and I would like it to recover gracefully when JBoss is rebooted. I thought that if I registered an ExceptionListener with my QueueConnection that it would be invoked when JBoss goes down, but I haven't observed that. I've enclosed an example program to demonstrate what I'm trying to do; its output is
Starting JBoss...JBoss is up
Sending message...enqueued...received
Shutting down JBoss...JBoss is down
Starting JBoss...JBoss is up
Sending message...enqueued...
[Then it hangs.] Any pointers as to where I'm going wrong are much appreciated.
Thanks!
Nate
import java.io.*;
import java.lang.IllegalStateException;
import java.util.*;
import javax.jms.*;
import javax.naming.*;
public final class JBossRebootExperiment2 {
public static void main(final String[] args) {
Listener listener = null;
try {
Process jboss = startJBoss();
try {
final Queue queue = (Queue) getJBossContext().lookup(QUEUE_NAME);
final QueueConnection connection = createQueueConnection();
final QueueSession session = connection.createQueueSession(true, 0);
listener = new Listener(session);
try {
session.createReceiver(queue).setMessageListener(listener);
connection.setExceptionListener(listener);
connection.start();
sendMessage(listener);
try {
shutdownJBoss(jboss);
}
finally {
jboss = null;
}
try {
Thread.sleep(5000);
}
catch (final InterruptedException e) {
}
jboss = startJBoss();
sendMessage(listener);
}
finally {
safeClose(connection);
System.out.println("Messages successfully transmitted: " + listener.messagesReceived);
System.out.println("Exceptions received: " + listener.exceptionsReceived);
}
}
finally {
if (jboss != null) {
shutdownJBoss(jboss);
}
}
}
catch (final Exception e) {
e.printStackTrace();
}
}
private static final class Listener implements MessageListener, ExceptionListener {
public Listener(final QueueSession session) {
this.session = session;
}
public synchronized void onMessage(final Message message) {
try {
session.commit();
++messagesReceived;
notifyAll();
}
catch (final JMSException e) {
e.printStackTrace();
}
}
public synchronized void onException(final JMSException e) {
++exceptionsReceived;
notifyAll();
}
public volatile int messagesReceived;
public volatile int exceptionsReceived;
private final QueueSession session;
}
private static QueueConnection createQueueConnection()
throws JMSException, NamingException {
final QueueConnectionFactory qcf = (QueueConnectionFactory)
getJBossContext().lookup(CONNECTION_FACTORY_NAME);
return qcf.createQueueConnection();
}
private static void sendMessage(final Listener listener)
throws InterruptedException, JMSException, NamingException {
System.out.print("Sending message...");
final int oldMessagesReceived = listener.messagesReceived;
sendMsg();
System.out.print("enqueued...");
synchronized (listener) {
while (oldMessagesReceived == listener.messagesReceived) listener.wait();
}
System.out.println("received");
}
private static void sendMsg() throws JMSException, NamingException {
final Queue queue = (Queue) getJBossContext().lookup(QUEUE_NAME);
final QueueConnection connection = createQueueConnection();
try {
final QueueSession session = connection.createQueueSession(true, 0);
final QueueSender sender = session.createSender(queue);
sender.setDeliveryMode(DeliveryMode.PERSISTENT);
sender.send(session.createTextMessage("foo"));
session.commit();
}
finally {
safeClose(connection);
}
}
private static void safeClose(final QueueConnection qc) {
try {
qc.close();
}
catch (final JMSException e) {
System.out.print("QueueConnection.close failed: ");
e.printStackTrace();
}
}
private static void safeClose(final Reader reader) {
try {
reader.close();
}
catch (final IOException e) {
System.out.print("Reader.close failed: ");
e.printStackTrace();
}
}
private static Context getJBossContext() throws NamingException {
final Hashtable ht = new Hashtable();
ht.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
ht.put(Context.PROVIDER_URL, "jnp://localhost");
ht.put(Context.URL_PKG_PREFIXES,
"org.jboss.naming:org.jnp.interfaces");
return new InitialContext(ht);
}
private static Process startJBoss() throws IOException {
System.out.print("Starting JBoss...");
final Process jboss = Runtime.getRuntime().exec(JBOSS_DIR + "/bin/run.sh");
final BufferedReader reader = new BufferedReader
(new InputStreamReader(jboss.getInputStream()));
try {
String line = reader.readLine();
while (line.indexOf("JBoss") == -1 || line.indexOf("MicroKernel") == -1 ||
line.indexOf("Started") == -1) {
line = reader.readLine();
}
System.out.println("JBoss is up");
}
finally {
safeClose(reader);
}
return jboss;
}
private static void shutdownJBoss(final Process jboss) throws IOException {
System.out.print("Shutting down JBoss...");
try {
// Give JBoss 15 seconds to go peacefully
Runtime.getRuntime().exec(JBOSS_DIR + "/bin/shutdown.sh -S");
final Interrupter interrupter =
new Interrupter(Thread.currentThread(), 15000);
jboss.waitFor();
interrupter.cancel();
}
catch (final InterruptedException e) {
killJBoss();
}
System.out.println("JBoss is down");
}
private static void killJBoss() throws IOException {
for (int i = 0; i < 2; ++i) {
final Process psef = Runtime.getRuntime().exec("ps -ef");
final BufferedReader reader = new BufferedReader
(new InputStreamReader(psef.getInputStream()));
try {
String line;
while ((line = reader.readLine()) != null) {
if (line.indexOf("org.jboss.Main") >= 0) {
Runtime.getRuntime().exec("kill -9 " + line.substring(9, 14));
}
}
}
finally {
safeClose(reader);
}
}
}
private static final class Interrupter implements Runnable {
public Interrupter(final Thread thread2interrupt, final long millis2wait) {
if (thread2interrupt == null) {
throw new NullPointerException("thread2interrupt must be non-null");
}
else if (millis2wait < 0) {
throw new IllegalArgumentException("millis2wait must be >= 0");
}
this.thread2interrupt = thread2interrupt;
this.millis2wait = millis2wait;
interrupterThread = new Thread(this);
interrupterThread.start();
}
public void cancel() {
interrupterThread.interrupt();
}
public void run() {
if (Thread.currentThread() != interrupterThread) {
throw new IllegalStateException("Wrong thread");
}
try {
Thread.sleep(millis2wait);
thread2interrupt.interrupt();
}
catch (final InterruptedException e) {
}
}
private final Thread thread2interrupt;
private final Thread interrupterThread;
private final long millis2wait;
}
private static final String QUEUE_NAME = "queue/A";
private static final String CONNECTION_FACTORY_NAME = "ConnectionFactory";
private static final String JBOSS_DIR = "/opt/jboss";
}