1.2.0.CR1 transparent node failover does not always work
bander Feb 23, 2007 1:49 AMI've been giving 1.2.0.CR1 a bit of a spin (JBoss 4.0.4, Win XP) and have found that failover breaks after a couple of iterations of stopping/starting servers.
I have created two nodes on my local machine according to the clustering guide (using a non-clustered queue). i.e. Node 0 is started via 'run -c messaging-node0' and Node 1 is started via 'run -c messaging-node1'.
After starting both nodes I start my test case and then selectively stop and restart each node. Failover usually occurs the first time a node is stopped but subsequent stops/starts will eventually put a stop to the message producer and message listener.
I've attached a test case if anyone wants to have a play with it in their environment. If you're seeing the dispatcher dispatching and message listener receiving messages then the test is passing - if either stop and then fail to restart then something has gone wrong.
It may be a configuration issue on my side - I'm pretty much using the default installation settings.
import java.util.Hashtable; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class ReconnectTest { class DispatcherThread extends Thread { private ConnectionFactory connectionFactory; private String id; private boolean initialised = false; private Queue queue; private boolean recycle = false; private boolean shutdown = false; public DispatcherThread(ConnectionFactory connectionFactory, Queue queue, String id) { super(); this.connectionFactory = connectionFactory; this.queue = queue; this.id = id; this.setName(id); } private boolean isRecycle() { return recycle; } public void run() { Connection connection = null; Session session = null; MessageProducer producer = null; ExceptionListener exceptionListener = null; while (!shutdown) { if (!initialised) { try { connection = connectionFactory.createConnection(); exceptionListener = new ExceptionListener() { public void onException(JMSException ex) { LOG.error("Received connection exception", ex); recycle = true; } }; connection.setExceptionListener(exceptionListener); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(queue); LOG.info(id + " initialised"); initialised = true; } catch (JMSException ex) { LOG.error("Caught exception during initialisation", ex); recycle = true; } } if (isRecycle()) { JMSHelper.close(producer); JMSHelper.close(session); JMSHelper.close(connection); initialised = false; recycle = false; } if (initialised && (!recycle) && (!shutdown)) { try { Thread.sleep(1000); Message message = session .createTextMessage("This is a test"); producer.send(message); LOG.info(id + " dispatched message"); } catch (Exception ex) { LOG.error("Caught exception during send", ex); recycle = true; } } } } public void shutdown() { LOG.info(id + " is shutting down"); recycle = true; shutdown = true; } } static class JMSHelper { public static void close(Connection connection) { if (connection != null) { try { connection.close(); } catch (Exception ex) { LOG.error("Caught exception when closing connection", ex); } connection = null; } } public static void close(MessageConsumer consumer) { if (consumer != null) { try { consumer.close(); } catch (Exception ex) { LOG.error("Caught exception when closing consumer", ex); } } consumer = null; } public static void close(MessageProducer producer) { if (producer != null) { try { producer.close(); } catch (Exception ex) { LOG.error("Caught exception when closing producer", ex); } } producer = null; } public static void close(Session session) { if (session != null) { try { session.close(); } catch (Exception ex) { LOG.error("Caught exception when closing session", ex); } } session = null; } } class ListenerManagerThread extends Thread { private ConnectionFactory connectionFactory; private String id; private boolean initialised = false; private MessageListener messageListener; private Queue queue; private boolean recycle = false; private boolean shutdown = false; public ListenerManagerThread(ConnectionFactory connectionFactory, Queue queue, MessageListener messageListener, String id) { super(); this.connectionFactory = connectionFactory; this.queue = queue; this.messageListener = messageListener; this.id = id; this.setName(id); } private boolean isRecycle() { return recycle; } public void run() { Connection connection = null; Session session = null; MessageConsumer consumer = null; ExceptionListener exceptionListener = null; while (!shutdown) { if (!initialised) { try { connection = connectionFactory.createConnection(); exceptionListener = new ExceptionListener() { public void onException(JMSException ex) { LOG.error("Received connection exception", ex); recycle = true; } }; connection.setExceptionListener(exceptionListener); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); consumer = session.createConsumer(queue); consumer.setMessageListener(messageListener); connection.start(); LOG.info(id + " initialised"); initialised = true; } catch (JMSException ex) { LOG.error("Caught exception during initialisation", ex); recycle = true; } } if (isRecycle()) { JMSHelper.close(consumer); JMSHelper.close(session); JMSHelper.close(connection); initialised = false; recycle = false; } try { Thread.sleep(1000); } catch (InterruptedException ex) { LOG.error("Caught exception during sleep"); } } } public void shutdown() { LOG.info(id + " is shutting down"); recycle = true; shutdown = true; } } class SimpleListener implements MessageListener { private String id; public SimpleListener(String id) { super(); this.id = id; } public void onMessage(Message message) { LOG.info(id + " received message"); } } private static final Log LOG = LogFactory.getLog(ReconnectTest.class); public static void main(String[] args) { ReconnectTest test = new ReconnectTest(); try { test.start(); } catch (Throwable ex) { LOG.error("Caught exception in main", ex); } } private void start() throws Exception { // Setup connection 1 Hashtable properties1 = new Hashtable(); properties1.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); properties1.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces"); properties1.put(Context.PROVIDER_URL, "jnp://localhost:1099"); properties1.put(Context.SECURITY_PRINCIPAL, "admin"); properties1.put(Context.SECURITY_CREDENTIALS, "admin"); ConnectionFactory connectionFactory1 = null; Queue queue1 = null; Context context1 = null; context1 = new InitialContext(properties1); connectionFactory1 = (ConnectionFactory) context1 .lookup("ConnectionFactory"); queue1 = (Queue) context1.lookup("/queue/testQueue"); MessageListener listener1 = new SimpleListener("Listener.1"); ListenerManagerThread manager1 = new ListenerManagerThread( connectionFactory1, queue1, listener1, "ListenerManager.1"); manager1.start(); DispatcherThread dispatcher1 = new DispatcherThread(connectionFactory1, queue1, "Dispatcher.1"); dispatcher1.start(); Thread.sleep(Long.MAX_VALUE); manager1.shutdown(); manager1.join(); dispatcher1.shutdown(); dispatcher1.join(); context1.close(); } }