0 Replies Latest reply on Feb 21, 2007 2:39 AM by bander

    Client unable to reconnect to JBoss 1.0.1.SP4 after restart

    bander

      Hello, were using JBoss 4.0.4GA with Messaging 1.0.1.SP4.

      I've been testing a basic failure scenario where our application is forced to reconnect to JBoss Messaging after experiencing a JBoss connectivity problem.

      In theory, when a JBoss connectivity problem is detected (via an ExceptionListener that we set on the jms Connection) we should be able to shut down all our message producers and consumers and recreate them using a new Connection - or at least keep trying to do so until a connection is obtained.

      What I'm finding is that about half of the time our producers and consumers can successfully reconnect. Frequently however, they appear to become 'stuck' in the jboss client when attempting to tidy-up after the problem with the Connection is detected. By tidy-up I mean we attempt to close our producers/consumers, sessions and finally the connections themselves - before starting over and recreating everything (i.e. the connection, session etc).

      I've written a test case to demonstrate this. What the test case does is spawn two threads. One thread is used for dispatching messages. This thread creates its own connection, session and message producer. The other thread is used for message listening. This thread also creates its own connection, session and message consumer (with message listener). The dispatcher sends messages to the same queue the listener has been configured on.

      Before running the test case, configure the queue name to be used.
      Note that the test case has a dependency on log4j and commons logging so you'll need those jars handy.
      Start JBoss.
      When you run the test case, the dispatcher will start sending messages which will be received by the listener.

      You should see the following output being repeated over and over:

      17:39:36,187 INFO @Dispatcher.1 [MultipleServerTest] Dispatcher.1 dispatched message
      17:39:36,187 INFO @Thread-12 [MultipleServerTest] Listener.1 received message
      17:39:37,203 INFO @Dispatcher.1 [MultipleServerTest] Dispatcher.1 dispatched message
      17:39:37,203 INFO @Thread-12 [MultipleServerTest] Listener.1 received message
      


      Stop and restart your JBoss server - what *should* happen each time is that dispatcher and listener recover and start sending/receiving messages again. What I'm frequently seeing is that either the dispatcher or listener or sometimes both do not recover. You may need to stop and start the server several times before the behaviour is observed.

      The listener/dispatcher threads seem to be getting stuck here:

      Thread [ListenerManager.1] (Suspended)
       SocketInputStream.socketRead0(FileDescriptor, byte[], int, int, int) line: not available [native method]
       SocketInputStream.read(byte[], int, int) line: 129
       BufferedInputStream.fill() line: 183
       BufferedInputStream.read() line: 201
       DataInputStream(FilterInputStream).read() line: 66
       JBossObjectInputStream.read() line: 193
       SocketClientInvoker(MicroSocketClientInvoker).readVersion(InputStream) line: 902
       SocketClientInvoker(MicroSocketClientInvoker).transport(String, Object, Map, Marshaller, UnMarshaller) line: 552
       SocketClientInvoker(MicroRemoteClientInvoker).invoke(InvocationRequest) line: 122
       Client.invoke(Object, Map, InvokerLocator) line: 1414
       Client.invoke(Object, Map) line: 511
       ClientConsumerDelegate(DelegateSupport).invoke(Invocation) line: 111
       ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line: not available
       ConsumerAspect.handleClosing(Invocation) line: 108
       ConsumerAspect25.invoke(Invocation) line: not available
       ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line: not available
       ClosedInterceptor.invoke(Invocation) line: 182
       PerInstanceInterceptor.invoke(Invocation) line: 117
       ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line: not available
       ExceptionInterceptor.invoke(Invocation) line: 69
       ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line: not available
       ClientLogInterceptor.invoke(Invocation) line: 107
       ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line: not available
       ClientConsumerDelegate.closing() line: not available
       JBossMessageConsumer.close() line: 96
       MultipleServerTest$ListenerManagerThread.run() line: 190
      


      and here:

      Thread [Dispatcher.1] (Suspended)
       SocketInputStream.socketRead0(FileDescriptor, byte[], int, int, int) line: not available [native method]
       SocketInputStream.read(byte[], int, int) line: 129
       BufferedInputStream.fill() line: 183
       BufferedInputStream.read() line: 201
       DataInputStream(FilterInputStream).read() line: 66
       JBossObjectInputStream.read() line: 193
       SocketClientInvoker(MicroSocketClientInvoker).readVersion(InputStream) line: 902
       SocketClientInvoker(MicroSocketClientInvoker).transport(String, Object, Map, Marshaller, UnMarshaller) line: 552
       SocketClientInvoker(MicroRemoteClientInvoker).invoke(InvocationRequest) line: 122
       Client.invoke(Object, Map, InvokerLocator) line: 1414
       Client.invoke(Object, Map) line: 511
       ClientConnectionDelegate(DelegateSupport).invoke(Invocation) line: 111
       ClientConnectionDelegate$closing_4945873952494833124.invokeNext() line: not available
       ClosedInterceptor.invoke(Invocation) line: 182
       PerInstanceInterceptor.invoke(Invocation) line: 117
       ClientConnectionDelegate$closing_4945873952494833124.invokeNext() line: not available
       ExceptionInterceptor.invoke(Invocation) line: 69
       ClientConnectionDelegate$closing_4945873952494833124.invokeNext() line: not available
       ClientLogInterceptor.invoke(Invocation) line: 107
       ClientConnectionDelegate$closing_4945873952494833124.invokeNext() line: not available
       ClientConnectionDelegate.closing() line: not available
       JBossConnection.close() line: 131
       MultipleServerTest$DispatcherThread.run() line: 99
      


      I also see errors in the JBoss log like:

      jvm 1 | 14:58:07,906 ERROR [ServerThread] SocketServerInvoker[0.0.0.0:4489].invoke() call failed: Object with oid: -2147483646 was not found in the Dispatcher
      jvm 1 | 14:58:07,906 ERROR [ServerThread] SocketServerInvoker[0.0.0.0:4489].invoke() call failed: Object with oid: -2147483642 was not found in the Dispatcher
      jvm 1 | 14:58:07,921 ERROR [STDERR] java.lang.ClassCircularityError: org/jboss/messaging/core/plugin/IdBlock
      jvm 1 | 14:58:07,921 ERROR [STDERR] at org.jboss.jms.server.remoting.JMSWireFormat.write(JMSWireFormat.java:342)
      jvm 1 | 14:58:07,921 ERROR [STDERR] at org.jboss.remoting.transport.socket.ServerThread.versionedWrite(ServerThread.java:778)
      jvm 1 | 14:58:07,921 ERROR [STDERR] at org.jboss.remoting.transport.socket.ServerThread.processInvocation(ServerThread.java:585)
      jvm 1 | 14:58:07,921 ERROR [STDERR] at org.jboss.remoting.transport.socket.ServerThread.dorun(ServerThread.java:363)
      jvm 1 | 14:58:07,921 ERROR [STDERR] at org.jboss.remoting.transport.socket.ServerThread.run(ServerThread.java:159)
      jvm 1 | 14:58:07,921 ERROR [STDERR] java.lang.ClassCircularityError: org/jboss/messaging/core/plugin/IdBlock
      jvm 1 | 14:58:07,921 ERROR [STDERR] at org.jboss.jms.server.remoting.JMSWireFormat.write(JMSWireFormat.java:342)
      jvm 1 | 14:58:07,921 ERROR [STDERR] at org.jboss.remoting.transport.socket.ServerThread.versionedWrite(ServerThread.java:778)
      jvm 1 | 14:58:07,921 ERROR [STDERR] at org.jboss.remoting.transport.socket.ServerThread.processInvocation(ServerThread.java:585)
      jvm 1 | 14:58:07,921 ERROR [STDERR] at org.jboss.remoting.transport.socket.ServerThread.dorun(ServerThread.java:363)
      jvm 1 | 14:58:07,921 ERROR [STDERR] at org.jboss.remoting.transport.socket.ServerThread.run(ServerThread.java:159)
      


      Anyway, here is my test case:

      import java.util.Hashtable;
      
      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.ExceptionListener;
      import javax.jms.JMSException;
      import javax.jms.Message;
      import javax.jms.MessageConsumer;
      import javax.jms.MessageListener;
      import javax.jms.MessageProducer;
      import javax.jms.Queue;
      import javax.jms.Session;
      import javax.naming.Context;
      import javax.naming.InitialContext;
      
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      
      public class RecoverTest {
      
       class DispatcherThread extends Thread {
       private ConnectionFactory connectionFactory;
      
       private String id;
      
       private boolean initialised = false;
      
       private Queue queue;
      
       private boolean recycle = false;
      
       private boolean shutdown = false;
      
       public DispatcherThread(ConnectionFactory connectionFactory,//
       Queue queue, String id) {
       super();
       this.connectionFactory = connectionFactory;
       this.queue = queue;
       this.id = id;
       this.setName(id);
       }
      
       private boolean isRecycle() {
       return recycle;
       }
      
       public void run() {
       Connection connection = null;
       Session session = null;
       MessageProducer producer = null;
       ExceptionListener exceptionListener = null;
      
       while (!shutdown) {
       if (!initialised) {
       try {
       connection = connectionFactory.createConnection();
       exceptionListener = new ExceptionListener() {
       public void onException(JMSException ex) {
       LOG.error("Received connection exception", ex);
       recycle = true;
       }
       };
       connection.setExceptionListener(exceptionListener);
       session = connection.createSession(false,
       Session.AUTO_ACKNOWLEDGE);
       producer = session.createProducer(queue);
       LOG.info(id + " initialised");
       initialised = true;
       } catch (JMSException ex) {
       LOG.error("Caught exception during initialisation", ex);
       recycle = true;
       }
       }
       if (isRecycle()) {
       if (producer != null) {
       try {
       producer.close();
       } catch (Exception ex) {
       LOG.error("Caught exception during producer close",
       ex);
       }
       }
       if (session != null) {
       try {
       session.close();
       } catch (Exception ex) {
       LOG.error("Caught exception during session close",
       ex);
       }
       }
       if (connection != null) {
       try {
       connection.close();
       } catch (Exception ex) {
       LOG.error(
       "Caught exception during connection close",
       ex);
       }
       }
       producer = null;
       session = null;
       connection = null;
       initialised = false;
       recycle = false;
       }
       if (initialised && (!recycle) && (!shutdown)) {
       try {
       Thread.sleep(1000);
       Message message = session
       .createTextMessage("This is a test");
       producer.send(message);
       LOG.info(id + " dispatched message");
       } catch (Exception ex) {
       LOG.error("Caught exception during send", ex);
       recycle = true;
       }
       }
       }
       }
      
       public void shutdown() {
       LOG.info(id + " is shutting down");
       recycle = true;
       shutdown = true;
       }
       }
      
       class ListenerManagerThread extends Thread implements ExceptionListener {
       private ConnectionFactory connectionFactory;
      
       private String id;
      
       private boolean initialised = false;
      
       private MessageListener messageListener;
      
       private Queue queue;
      
       private boolean recycle = false;
      
       private boolean shutdown = false;
      
       public ListenerManagerThread(ConnectionFactory connectionFactory,
       Queue queue, MessageListener messageListener, String id) {
       super();
       this.connectionFactory = connectionFactory;
       this.queue = queue;
       this.messageListener = messageListener;
       this.id = id;
       this.setName(id);
       }
      
       private boolean isRecycle() {
       return recycle;
       }
      
       public void onException(JMSException ex) {
       LOG.error("Received connection exception", ex);
       recycle = true;
       }
      
       public void run() {
       Connection connection = null;
       Session session = null;
       MessageConsumer consumer = null;
      
       while (!shutdown) {
       if (!initialised) {
       try {
       connection = connectionFactory.createConnection();
       connection.setExceptionListener(this);
       session = connection.createSession(false,
       Session.AUTO_ACKNOWLEDGE);
       consumer = session.createConsumer(queue);
       consumer.setMessageListener(messageListener);
       connection.start();
       LOG.info(id + " initialised");
       initialised = true;
       } catch (JMSException ex) {
       LOG.error("Caught exception during initialisation", ex);
       recycle = true;
       }
       }
       if (isRecycle()) {
       if (consumer != null) {
       try {
       consumer.setMessageListener(null);
       consumer.close();
       } catch (Exception ex) {
       LOG.error("Caught exception during consumer close",
       ex);
       }
       }
       if (session != null) {
       try {
       session.close();
       } catch (Exception ex) {
       LOG.error("Caught exception during session close",
       ex);
       }
       }
       if (connection != null) {
       try {
       connection.close();
       } catch (Exception ex) {
       LOG.error(
       "Caught exception during connection close",
       ex);
       }
       }
       consumer = null;
       session = null;
       connection = null;
       initialised = false;
       recycle = false;
       }
       try {
       Thread.sleep(1000);
       } catch (InterruptedException ex) {
       LOG.error("Caught exception during sleep");
       }
       }
       }
      
       public void shutdown() {
       LOG.info(id + " is shutting down");
       recycle = true;
       shutdown = true;
       }
       }
      
       class SimpleListener implements MessageListener {
      
       private String id;
      
       public SimpleListener(String id) {
       super();
       this.id = id;
       }
      
       /**
       * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
       */
       public void onMessage(Message message) {
       LOG.info(id + " received message");
       }
      
       }
      
       private static final Log LOG = LogFactory.getLog(RecoverTest.class);
      
       public static void main(String[] args) {
       RecoverTest test = new RecoverTest();
      
       try {
       test.start();
       } catch (Throwable ex) {
       LOG.error("Caught exception in main", ex);
       }
       }
      
       private void start() throws Exception {
       // Setup connection 1
       Hashtable properties1 = new Hashtable();
       properties1.put(Context.INITIAL_CONTEXT_FACTORY,
       "org.jnp.interfaces.NamingContextFactory");
       properties1.put(Context.URL_PKG_PREFIXES,
       "org.jboss.naming:org.jnp.interfaces");
       properties1.put(Context.PROVIDER_URL, "jnp://localhost:1099");
       properties1.put(Context.SECURITY_PRINCIPAL, "admin");
       properties1.put(Context.SECURITY_CREDENTIALS, "admin");
      
       ConnectionFactory connectionFactory1 = null;
       Queue queue1 = null;
       Context context1 = null;
      
       context1 = new InitialContext(properties1);
       connectionFactory1 = (ConnectionFactory) context1
       .lookup("ConnectionFactory");
       queue1 = (Queue) context1.lookup("/queue/testQueue");
      
       MessageListener listener1 = new SimpleListener("Listener.1");
       ListenerManagerThread manager1 = new ListenerManagerThread(
       connectionFactory1, queue1, listener1, "ListenerManager.1");
       manager1.start();
      
       DispatcherThread dispatcher1 = new DispatcherThread(connectionFactory1,
       queue1, "Dispatcher.1");
       dispatcher1.start();
      
       Thread.sleep(600000);
      
       manager1.shutdown();
       manager1.join();
       dispatcher1.shutdown();
       dispatcher1.join();
       context1.close();
       }
      }