Catching Exceptions on a JMS Connection and Reconnecting
The HA version of JBossMQ is based on deploying the JMS server as an HASingleton. This means that when the AS instance hosting the JMS server fails or shuts down, the other nodes in the cluster will be aware of this, and on one of them a new instance of the JMS server will be started.
This server failover process can cause problems for JMS clients, who find that their connections to the JMS server are broken. Clients can deal with this problem by registering for a callback when an exception occurs on the connection, and then doing a fresh lookup of the connection factory and the queue or topic.
I'm inside the application server, how do I install the exception listener?
You don't need to for an MDB, JBoss does it for you.
For anything else, you should be using the JMS resource adapter which also does it for you.
I'm not in the application server, how do I install the exception listener?
Following is a example application that continuous sends messages to a queue, handling any exceptions that occur.
package com.test.hajms.client; import javax.naming.InitialContext; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Connection; import javax.jms.Session; import javax.jms.MessageProducer; import javax.jms.Message; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.DeliveryMode; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class FailoverJMSClient { private static final Log log = LogFactory.getLog(FailoverJMSClient.class); public static final int NUM_RETRIES = 3; volatile boolean doSend = true; ConnectionFactory connectionFactory; Destination queue; Connection connection; Session session; MessageProducer producer; public static void main(String[] args) throws Exception { FailoverJMSClient jmsClient = new FailoverJMSClient(); jmsClient.setUpJMS(); jmsClient.sendMessages(); } public boolean setUpJMS() { InitialContext ic; try { ic = new InitialContext(); connectionFactory = (ConnectionFactory)ic.lookup("ConnectionFactory"); queue = (Destination)ic.lookup("queue/FailoverTestQueue"); connection = connectionFactory.createConnection(); try { log.debug("Connection created ..."); // KEY - register for exception callbacks connection.setExceptionListener(new ExceptionListenerImpl()); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); log.debug("Session created ..."); producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); log.debug("Producer created ..."); return true; } catch (Exception e) { // We failed so close the connection try { connection.close(); } catch (JMSException ignored) { // Pointless } // Rethrow the initial problem to where we will log it throw e; } finally { // And close the initial context // We don't want to wait for the garbage collector to close it // otherwise we'll have useless hanging network connections ic.close(); } } catch (Exception e) { log.error("Error setting up JMS", e); return false; } } public void sendMessages() { int cnt = 0; while(doSend) { try { Thread.sleep(100); Message m = session.createObjectMessage(new Integer(cnt++)); producer.send(m); log.trace("message " + cnt + " sent"); } catch(Exception e) { cnt--; log.error(e.getMessage()); } } } private class ExceptionListenerImpl implements ExceptionListener { public void onException(JMSException e) { for(int i = 0; i < NUM_RETRIES; i++) { log.warn("Connection has problems, trying to re-create it, attempt " + (i + 1) + " ..."); try { connection.close(); // unregisters the ExceptionListener } catch(Exception e2) { // I will get an Exception anyway, since the connection to the server is // broken, but close() frees up resources associated with the connection } boolean setupOK = setUpJMS(); if (setupOK) { log.info("Connection re-established"); return; } else { log.warn("Re-creating connection failed, retrying ..."); } } log.error("Cannot re-establish connection, giving up ..."); doSend = false; } } }
Comments