3 Replies Latest reply on Mar 19, 2004 8:43 PM by adrian.brock

    ExceptionListener not called when JBoss goes down

    natemc

      Hello!

      I have a process communicating with JBoss via JMS, and I would like it to recover gracefully when JBoss is rebooted. I thought that if I registered an ExceptionListener with my QueueConnection that it would be invoked when JBoss goes down, but I haven't observed that. I've enclosed an example program to demonstrate what I'm trying to do; its output is

      Starting JBoss...JBoss is up
      Sending message...enqueued...received
      Shutting down JBoss...JBoss is down
      Starting JBoss...JBoss is up
      Sending message...enqueued...

      [Then it hangs.] Any pointers as to where I'm going wrong are much appreciated.

      Thanks!
      Nate


      import java.io.*;
      import java.lang.IllegalStateException;
      import java.util.*;
      import javax.jms.*;
      import javax.naming.*;
      
      public final class JBossRebootExperiment2 {
       public static void main(final String[] args) {
       Listener listener = null;
       try {
       Process jboss = startJBoss();
       try {
       final Queue queue = (Queue) getJBossContext().lookup(QUEUE_NAME);
       final QueueConnection connection = createQueueConnection();
       final QueueSession session = connection.createQueueSession(true, 0);
       listener = new Listener(session);
       try {
       session.createReceiver(queue).setMessageListener(listener);
       connection.setExceptionListener(listener);
       connection.start();
       sendMessage(listener);
      
       try {
       shutdownJBoss(jboss);
       }
       finally {
       jboss = null;
       }
      
       try {
       Thread.sleep(5000);
       }
       catch (final InterruptedException e) {
       }
      
       jboss = startJBoss();
       sendMessage(listener);
       }
       finally {
       safeClose(connection);
       System.out.println("Messages successfully transmitted: " + listener.messagesReceived);
       System.out.println("Exceptions received: " + listener.exceptionsReceived);
       }
       }
       finally {
       if (jboss != null) {
       shutdownJBoss(jboss);
       }
       }
       }
       catch (final Exception e) {
       e.printStackTrace();
       }
       }
      
       private static final class Listener implements MessageListener, ExceptionListener {
       public Listener(final QueueSession session) {
       this.session = session;
       }
      
       public synchronized void onMessage(final Message message) {
       try {
       session.commit();
       ++messagesReceived;
       notifyAll();
       }
       catch (final JMSException e) {
       e.printStackTrace();
       }
       }
      
       public synchronized void onException(final JMSException e) {
       ++exceptionsReceived;
       notifyAll();
       }
      
       public volatile int messagesReceived;
       public volatile int exceptionsReceived;
       private final QueueSession session;
       }
      
       private static QueueConnection createQueueConnection()
       throws JMSException, NamingException {
       final QueueConnectionFactory qcf = (QueueConnectionFactory)
       getJBossContext().lookup(CONNECTION_FACTORY_NAME);
       return qcf.createQueueConnection();
       }
      
       private static void sendMessage(final Listener listener)
       throws InterruptedException, JMSException, NamingException {
       System.out.print("Sending message...");
       final int oldMessagesReceived = listener.messagesReceived;
       sendMsg();
       System.out.print("enqueued...");
       synchronized (listener) {
       while (oldMessagesReceived == listener.messagesReceived) listener.wait();
       }
       System.out.println("received");
       }
      
       private static void sendMsg() throws JMSException, NamingException {
       final Queue queue = (Queue) getJBossContext().lookup(QUEUE_NAME);
       final QueueConnection connection = createQueueConnection();
       try {
       final QueueSession session = connection.createQueueSession(true, 0);
       final QueueSender sender = session.createSender(queue);
       sender.setDeliveryMode(DeliveryMode.PERSISTENT);
       sender.send(session.createTextMessage("foo"));
       session.commit();
       }
       finally {
       safeClose(connection);
       }
       }
      
       private static void safeClose(final QueueConnection qc) {
       try {
       qc.close();
       }
       catch (final JMSException e) {
       System.out.print("QueueConnection.close failed: ");
       e.printStackTrace();
       }
       }
      
       private static void safeClose(final Reader reader) {
       try {
       reader.close();
       }
       catch (final IOException e) {
       System.out.print("Reader.close failed: ");
       e.printStackTrace();
       }
       }
      
       private static Context getJBossContext() throws NamingException {
       final Hashtable ht = new Hashtable();
       ht.put(Context.INITIAL_CONTEXT_FACTORY,
       "org.jnp.interfaces.NamingContextFactory");
       ht.put(Context.PROVIDER_URL, "jnp://localhost");
       ht.put(Context.URL_PKG_PREFIXES,
       "org.jboss.naming:org.jnp.interfaces");
       return new InitialContext(ht);
       }
      
       private static Process startJBoss() throws IOException {
       System.out.print("Starting JBoss...");
       final Process jboss = Runtime.getRuntime().exec(JBOSS_DIR + "/bin/run.sh");
       final BufferedReader reader = new BufferedReader
       (new InputStreamReader(jboss.getInputStream()));
       try {
       String line = reader.readLine();
       while (line.indexOf("JBoss") == -1 || line.indexOf("MicroKernel") == -1 ||
       line.indexOf("Started") == -1) {
       line = reader.readLine();
       }
       System.out.println("JBoss is up");
       }
       finally {
       safeClose(reader);
       }
      
       return jboss;
       }
      
       private static void shutdownJBoss(final Process jboss) throws IOException {
       System.out.print("Shutting down JBoss...");
       try {
       // Give JBoss 15 seconds to go peacefully
       Runtime.getRuntime().exec(JBOSS_DIR + "/bin/shutdown.sh -S");
       final Interrupter interrupter =
       new Interrupter(Thread.currentThread(), 15000);
       jboss.waitFor();
       interrupter.cancel();
       }
       catch (final InterruptedException e) {
       killJBoss();
       }
       System.out.println("JBoss is down");
       }
      
       private static void killJBoss() throws IOException {
       for (int i = 0; i < 2; ++i) {
       final Process psef = Runtime.getRuntime().exec("ps -ef");
       final BufferedReader reader = new BufferedReader
       (new InputStreamReader(psef.getInputStream()));
       try {
       String line;
       while ((line = reader.readLine()) != null) {
       if (line.indexOf("org.jboss.Main") >= 0) {
       Runtime.getRuntime().exec("kill -9 " + line.substring(9, 14));
       }
       }
       }
       finally {
       safeClose(reader);
       }
       }
       }
      
       private static final class Interrupter implements Runnable {
       public Interrupter(final Thread thread2interrupt, final long millis2wait) {
       if (thread2interrupt == null) {
       throw new NullPointerException("thread2interrupt must be non-null");
       }
       else if (millis2wait < 0) {
       throw new IllegalArgumentException("millis2wait must be >= 0");
       }
      
       this.thread2interrupt = thread2interrupt;
       this.millis2wait = millis2wait;
       interrupterThread = new Thread(this);
       interrupterThread.start();
       }
      
       public void cancel() {
       interrupterThread.interrupt();
       }
      
       public void run() {
       if (Thread.currentThread() != interrupterThread) {
       throw new IllegalStateException("Wrong thread");
       }
      
       try {
       Thread.sleep(millis2wait);
       thread2interrupt.interrupt();
       }
       catch (final InterruptedException e) {
       }
       }
      
       private final Thread thread2interrupt;
       private final Thread interrupterThread;
       private final long millis2wait;
       }
      
      
      
       private static final String QUEUE_NAME = "queue/A";
       private static final String CONNECTION_FACTORY_NAME = "ConnectionFactory";
       private static final String JBOSS_DIR = "/opt/jboss";
      }
      


        • 1. Re: ExceptionListener not called when JBoss goes down

          Your connection is broken, so your message listener is never going to receive another
          message.
          You need to reconnect to the server after the onException()
          Try adding e.printStackTrace() to your onException() method so you can
          see what is happening.

          Regards,
          Adrian

          • 2. Re: ExceptionListener not called when JBoss goes down
            natemc

            Oops.

            I added e.printStackTrace in onException. It is never called (well, I only waited about 1/2 hour).

            And often the second sendMessage can't connect to queue/A and I get the following output:

            Starting JBoss...JBoss is up
            Sending message...enqueued...received
            Shutting down JBoss...JBoss is down
            Starting JBoss...JBoss is up
            org.jboss.mq.SpyJMSException: Cannot unsubscribe to this destination; - nested throwable: (java.io.EOFException)
            at org.jboss.mq.Connection.removeConsumer(Connection.java:1208)
            at org.jboss.mq.SpySession.removeConsumer(SpySession.java:733)
            at org.jboss.mq.SpyMessageConsumer.close(SpyMessageConsumer.java:417)
            at org.jboss.mq.SpySession.close(SpySession.java:347)
            at org.jboss.mq.Connection.close(Connection.java:468)
            at JBossRebootExperiment2.safeClose(JBossRebootExperiment2.java:121)
            at JBossRebootExperiment2.main(JBossRebootExperiment2.java:42)
            Caused by: java.io.EOFException
            at java.io.ObjectInputStream$BlockDataInputStream.readByte(ObjectInputStream.java:2606)
            at java.io.ObjectInputStream.readByte(ObjectInputStream.java:845)
            at org.jboss.mq.il.oil.OILServerIL.waitAnswer(OILServerIL.java:601)
            at org.jboss.mq.il.oil.OILServerIL.unsubscribe(OILServerIL.java:494)
            at org.jboss.mq.Connection.removeConsumer(Connection.java:1198)
            ... 6 more
            Sending message...QueueConnection.close failed: Messages successfully transmitted: 1
            Exceptions received: 0
            Shutting down JBoss...JBoss is down
            javax.naming.CommunicationException. Root exception is java.rmi.NoSuchObjectException: no such object in table
            at sun.rmi.transport.StreamRemoteCall.exceptionReceivedFromServer(StreamRemoteCall.java:247)
            at sun.rmi.transport.StreamRemoteCall.executeCall(StreamRemoteCall.java:223)
            at sun.rmi.server.UnicastRef.invoke(UnicastRef.java:133)
            at org.jnp.server.NamingServer_Stub.lookup(Unknown Source)
            at org.jnp.interfaces.NamingContext.lookup(NamingContext.java:492)
            at org.jnp.interfaces.NamingContext.lookup(NamingContext.java:471)
            at javax.naming.InitialContext.lookup(InitialContext.java:347)
            at JBossRebootExperiment2.sendMsg(JBossRebootExperiment2.java:105)
            at JBossRebootExperiment2.sendMessage(JBossRebootExperiment2.java:96)
            at JBossRebootExperiment2.main(JBossRebootExperiment2.java:39)
            Process terminated with exit code 0

            • 3. Re: ExceptionListener not called when JBoss goes down

              It woun't get called if you close the connection.

              at org.jboss.mq.Connection.close(Connection.java:468)
              at JBossRebootExperiment2.safeClose(JBossRebootExperiment2.java:121)
              at JBossRebootExperiment2.main(JBossRebootExperiment2.java:42)

              Regards,
              Adrian