7 Replies Latest reply on Mar 3, 2007 2:25 PM by Ovidiu Feodorov

    1.2.0.CR1 transparent node failover does not always work

    Ben Anderson Newbie

      I've been giving 1.2.0.CR1 a bit of a spin (JBoss 4.0.4, Win XP) and have found that failover breaks after a couple of iterations of stopping/starting servers.

      I have created two nodes on my local machine according to the clustering guide (using a non-clustered queue). i.e. Node 0 is started via 'run -c messaging-node0' and Node 1 is started via 'run -c messaging-node1'.

      After starting both nodes I start my test case and then selectively stop and restart each node. Failover usually occurs the first time a node is stopped but subsequent stops/starts will eventually put a stop to the message producer and message listener.

      I've attached a test case if anyone wants to have a play with it in their environment. If you're seeing the dispatcher dispatching and message listener receiving messages then the test is passing - if either stop and then fail to restart then something has gone wrong.

      It may be a configuration issue on my side - I'm pretty much using the default installation settings.

      import java.util.Hashtable;
      
      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.ExceptionListener;
      import javax.jms.JMSException;
      import javax.jms.Message;
      import javax.jms.MessageConsumer;
      import javax.jms.MessageListener;
      import javax.jms.MessageProducer;
      import javax.jms.Queue;
      import javax.jms.Session;
      import javax.naming.Context;
      import javax.naming.InitialContext;
      
      import org.apache.commons.logging.Log;
      import org.apache.commons.logging.LogFactory;
      
      public class ReconnectTest {
      
       class DispatcherThread extends Thread {
       private ConnectionFactory connectionFactory;
      
       private String id;
      
       private boolean initialised = false;
      
       private Queue queue;
      
       private boolean recycle = false;
      
       private boolean shutdown = false;
      
       public DispatcherThread(ConnectionFactory connectionFactory,
       Queue queue, String id) {
       super();
       this.connectionFactory = connectionFactory;
       this.queue = queue;
       this.id = id;
       this.setName(id);
       }
      
       private boolean isRecycle() {
       return recycle;
       }
      
       public void run() {
       Connection connection = null;
       Session session = null;
       MessageProducer producer = null;
       ExceptionListener exceptionListener = null;
      
       while (!shutdown) {
       if (!initialised) {
       try {
       connection = connectionFactory.createConnection();
       exceptionListener = new ExceptionListener() {
       public void onException(JMSException ex) {
       LOG.error("Received connection exception", ex);
       recycle = true;
       }
       };
       connection.setExceptionListener(exceptionListener);
       session = connection.createSession(false,
       Session.AUTO_ACKNOWLEDGE);
       producer = session.createProducer(queue);
       LOG.info(id + " initialised");
       initialised = true;
       } catch (JMSException ex) {
       LOG.error("Caught exception during initialisation", ex);
       recycle = true;
       }
       }
       if (isRecycle()) {
       JMSHelper.close(producer);
       JMSHelper.close(session);
       JMSHelper.close(connection);
       initialised = false;
       recycle = false;
       }
       if (initialised && (!recycle) && (!shutdown)) {
       try {
       Thread.sleep(1000);
       Message message = session
       .createTextMessage("This is a test");
       producer.send(message);
       LOG.info(id + " dispatched message");
       } catch (Exception ex) {
       LOG.error("Caught exception during send", ex);
       recycle = true;
       }
       }
       }
       }
      
       public void shutdown() {
       LOG.info(id + " is shutting down");
       recycle = true;
       shutdown = true;
       }
       }
      
       static class JMSHelper {
       public static void close(Connection connection) {
       if (connection != null) {
       try {
       connection.close();
       } catch (Exception ex) {
       LOG.error("Caught exception when closing connection", ex);
       }
       connection = null;
       }
       }
      
       public static void close(MessageConsumer consumer) {
       if (consumer != null) {
       try {
       consumer.close();
       } catch (Exception ex) {
       LOG.error("Caught exception when closing consumer", ex);
       }
       }
       consumer = null;
       }
      
       public static void close(MessageProducer producer) {
       if (producer != null) {
       try {
       producer.close();
       } catch (Exception ex) {
       LOG.error("Caught exception when closing producer", ex);
       }
       }
       producer = null;
       }
      
       public static void close(Session session) {
       if (session != null) {
       try {
       session.close();
       } catch (Exception ex) {
       LOG.error("Caught exception when closing session", ex);
       }
       }
       session = null;
       }
       }
      
       class ListenerManagerThread extends Thread {
       private ConnectionFactory connectionFactory;
      
       private String id;
      
       private boolean initialised = false;
      
       private MessageListener messageListener;
      
       private Queue queue;
      
       private boolean recycle = false;
      
       private boolean shutdown = false;
      
       public ListenerManagerThread(ConnectionFactory connectionFactory,
       Queue queue, MessageListener messageListener, String id) {
       super();
       this.connectionFactory = connectionFactory;
       this.queue = queue;
       this.messageListener = messageListener;
       this.id = id;
       this.setName(id);
       }
      
       private boolean isRecycle() {
       return recycle;
       }
      
       public void run() {
       Connection connection = null;
       Session session = null;
       MessageConsumer consumer = null;
       ExceptionListener exceptionListener = null;
      
       while (!shutdown) {
       if (!initialised) {
       try {
       connection = connectionFactory.createConnection();
       exceptionListener = new ExceptionListener() {
       public void onException(JMSException ex) {
       LOG.error("Received connection exception", ex);
       recycle = true;
       }
       };
       connection.setExceptionListener(exceptionListener);
       session = connection.createSession(false,
       Session.AUTO_ACKNOWLEDGE);
       consumer = session.createConsumer(queue);
       consumer.setMessageListener(messageListener);
       connection.start();
       LOG.info(id + " initialised");
       initialised = true;
       } catch (JMSException ex) {
       LOG.error("Caught exception during initialisation", ex);
       recycle = true;
       }
       }
       if (isRecycle()) {
       JMSHelper.close(consumer);
       JMSHelper.close(session);
       JMSHelper.close(connection);
       initialised = false;
       recycle = false;
       }
       try {
       Thread.sleep(1000);
       } catch (InterruptedException ex) {
       LOG.error("Caught exception during sleep");
       }
       }
       }
      
       public void shutdown() {
       LOG.info(id + " is shutting down");
       recycle = true;
       shutdown = true;
       }
       }
      
       class SimpleListener implements MessageListener {
      
       private String id;
      
       public SimpleListener(String id) {
       super();
       this.id = id;
       }
      
       public void onMessage(Message message) {
       LOG.info(id + " received message");
       }
      
       }
      
       private static final Log LOG = LogFactory.getLog(ReconnectTest.class);
      
       public static void main(String[] args) {
       ReconnectTest test = new ReconnectTest();
      
       try {
       test.start();
       } catch (Throwable ex) {
       LOG.error("Caught exception in main", ex);
       }
       }
      
       private void start() throws Exception {
       // Setup connection 1
       Hashtable properties1 = new Hashtable();
       properties1.put(Context.INITIAL_CONTEXT_FACTORY,
       "org.jnp.interfaces.NamingContextFactory");
       properties1.put(Context.URL_PKG_PREFIXES,
       "org.jboss.naming:org.jnp.interfaces");
       properties1.put(Context.PROVIDER_URL, "jnp://localhost:1099");
       properties1.put(Context.SECURITY_PRINCIPAL, "admin");
       properties1.put(Context.SECURITY_CREDENTIALS, "admin");
      
       ConnectionFactory connectionFactory1 = null;
       Queue queue1 = null;
       Context context1 = null;
      
       context1 = new InitialContext(properties1);
       connectionFactory1 = (ConnectionFactory) context1
       .lookup("ConnectionFactory");
       queue1 = (Queue) context1.lookup("/queue/testQueue");
      
       MessageListener listener1 = new SimpleListener("Listener.1");
       ListenerManagerThread manager1 = new ListenerManagerThread(
       connectionFactory1, queue1, listener1, "ListenerManager.1");
       manager1.start();
      
       DispatcherThread dispatcher1 = new DispatcherThread(connectionFactory1,
       queue1, "Dispatcher.1");
       dispatcher1.start();
      
       Thread.sleep(Long.MAX_VALUE);
      
       manager1.shutdown();
       manager1.join();
      
       dispatcher1.shutdown();
       dispatcher1.join();
      
       context1.close();
       }
      }