1 2 Previous Next 20 Replies Latest reply on Feb 22, 2007 11:13 PM by bander Go to original post
      • 15. Re: Connecting to two JBoss messaging servers causes interfe
        davidrh

        Thanks Tim. Will have a look at patching those classes in on Monday and re-testing. Is there any configuration option that will make the exception listeners fire faster in the case of a failed connection?

        • 16. Re: Connecting to two JBoss messaging servers causes interfe
          davidrh

          I have made the changes suggested and patched them into my 1.0.1.GA deployment. The classes that I needed to change were:


          org.jboss.jms.client.container.ConsumerAspect
          org.jboss.jms.client.remoting.CallbackManager
          org.jboss.jms.server.endpoint.ClientDelivery
          org.jboss.jms.server.endpoint.ServerConsumerEndpoint


          I have re-tested with my original test case and it seems to work. If I put the code back in to close the server1 connection on an exception, then server 2 logs the warning:
          15:47:08,828 WARN [SimpleConnectionManager] A problem has been detected with the connection to remote client 4h39k3q-6gtwyu-eugdls00-1-eugdlu2o-9. It is possible the client has exited without closing its connection(s) or there is a network problem. All connection resources corresponding to that client process will now be removed.
          

          and my server2 producer is unable to send a message, so there is still some sort of interaction occurring. Attempting to close the connection on an error does seem to be the reasonable thing to do, so that the messaging client has a chance to clean up any loose ends.

          If you don't close the session and/or connection, the client application keeps logging the message:
          2006-11-13 15:51:00,515 WARN org.jboss.remoting.LeasePinger - Error sending lease ping to server for client invoker (session id 4h39k3q-524pp9-euge7ah3-1-euge7biu-5.
          

          every 30 seconds or so.

          • 17. Re: Connecting to two JBoss messaging servers causes interfe
            bander

            Can one of the developers please tell me if the change discussed here was eventually backported to the 1.x branch? e.g. is it in 1.0.1.SP4?

            It would be nice to have to ability to programatically dispatch to more than one JBoss Messaging server.

            Thanks,
            Ben

            • 18. Re: Connecting to two JBoss messaging servers causes interfe
              bander

              After some re-testing of this issue I can confirm it is still present.

              I've developed a new test case that creates a message producer and consumer on two separate JBoss Messaging servers. By continually shutting down and restarting each JBoss Messaging server I can eventually cause the shutting down of one server to stop the message listeners on the *other* server.

              This test case also demonstrates the same reconnection issue that I've raised in this post http://www.jboss.org/index.html?module=bb&op=viewtopic&t=102233

              Our JBoss Messaging servers have different ServerPeerIDs and StoreIds as suggested elsewhere.

              To run the test case, start both JBoss Messaging servers then start the test case. A producer will be created on each server and will start dispatching messages to a queue. A message listener on the queue will acknowledge the dispatched message. Stop one of the servers for a while then restart it. The producer and consumer on that server should start up again. Quite often they do not.

              import java.util.Hashtable;
              
              import javax.jms.Connection;
              import javax.jms.ConnectionFactory;
              import javax.jms.ExceptionListener;
              import javax.jms.JMSException;
              import javax.jms.Message;
              import javax.jms.MessageConsumer;
              import javax.jms.MessageListener;
              import javax.jms.MessageProducer;
              import javax.jms.Queue;
              import javax.jms.Session;
              import javax.naming.Context;
              import javax.naming.InitialContext;
              
              import org.apache.commons.logging.Log;
              import org.apache.commons.logging.LogFactory;
              
              public class MultipleServerReconnectTest {
              
               class DispatcherThread extends Thread {
               private ConnectionFactory connectionFactory;
              
               private String id;
              
               private boolean initialised = false;
              
               private Queue queue;
              
               private boolean recycle = false;
              
               private boolean shutdown = false;
              
               public DispatcherThread(ConnectionFactory connectionFactory,//
               Queue queue, String id) {
               super();
               this.connectionFactory = connectionFactory;
               this.queue = queue;
               this.id = id;
               this.setName(id);
               }
              
               private boolean isRecycle() {
               return recycle;
               }
              
               public void run() {
               Connection connection = null;
               Session session = null;
               MessageProducer producer = null;
               ExceptionListener exceptionListener = null;
              
               while (!shutdown) {
               if (!initialised) {
               try {
               connection = connectionFactory.createConnection();
               exceptionListener = new ExceptionListener() {
               public void onException(JMSException ex) {
               LOG.error("Received connection exception", ex);
               recycle = true;
               }
               };
               connection.setExceptionListener(exceptionListener);
               session = connection.createSession(false,
               Session.AUTO_ACKNOWLEDGE);
               producer = session.createProducer(queue);
               LOG.info(id + " initialised");
               initialised = true;
               } catch (JMSException ex) {
               LOG.error("Caught exception during initialisation", ex);
               recycle = true;
               }
               }
               if (isRecycle()) {
               if (producer != null) {
               try {
               producer.close();
               } catch (Exception ex) {
               LOG.error("Caught exception during producer close",
               ex);
               }
               }
               if (session != null) {
               try {
               session.close();
               } catch (Exception ex) {
               LOG.error("Caught exception during session close",
               ex);
               }
               }
               if (connection != null) {
               try {
               connection.close();
               } catch (Exception ex) {
               LOG.error(
               "Caught exception during connection close",
               ex);
               }
               }
               producer = null;
               session = null;
               connection = null;
               initialised = false;
               recycle = false;
               }
               if (initialised && (!recycle) && (!shutdown)) {
               try {
               Thread.sleep(1000);
               Message message = session
               .createTextMessage("This is a test");
               producer.send(message);
               LOG.info(id + " dispatched message");
               } catch (Exception ex) {
               LOG.error("Caught exception during send", ex);
               recycle = true;
               }
               }
               }
               }
              
               public void shutdown() {
               LOG.info(id + " is shutting down");
               recycle = true;
               shutdown = true;
               }
               }
              
               class ListenerManagerThread extends Thread {
               private ConnectionFactory connectionFactory;
              
               private String id;
              
               private boolean initialised = false;
              
               private MessageListener messageListener;
              
               private Queue queue;
              
               private boolean recycle = false;
              
               private boolean shutdown = false;
              
               public ListenerManagerThread(ConnectionFactory connectionFactory,
               Queue queue, MessageListener messageListener, String id) {
               super();
               this.connectionFactory = connectionFactory;
               this.queue = queue;
               this.messageListener = messageListener;
               this.id = id;
               this.setName(id);
               }
              
               private boolean isRecycle() {
               return recycle;
               }
              
               public void run() {
               Connection connection = null;
               Session session = null;
               MessageConsumer consumer = null;
               ExceptionListener exceptionListener = null;
              
               while (!shutdown) {
               if (!initialised) {
               try {
               connection = connectionFactory.createConnection();
               exceptionListener = new ExceptionListener() {
               public void onException(JMSException ex) {
               LOG.error("Received connection exception", ex);
               recycle = true;
               }
               };
               connection.setExceptionListener(exceptionListener);
               session = connection.createSession(false,
               Session.AUTO_ACKNOWLEDGE);
               consumer = session.createConsumer(queue);
               consumer.setMessageListener(messageListener);
               connection.start();
               LOG.info(id + " initialised");
               initialised = true;
               } catch (JMSException ex) {
               LOG.error("Caught exception during initialisation", ex);
               recycle = true;
               }
               }
               if (isRecycle()) {
               if (consumer != null) {
               try {
               consumer.setMessageListener(null);
               consumer.close();
               } catch (Exception ex) {
               LOG.error("Caught exception during consumer close",
               ex);
               }
               }
               if (session != null) {
               try {
               session.close();
               } catch (Exception ex) {
               LOG.error("Caught exception during session close",
               ex);
               }
               }
               if (connection != null) {
               try {
               connection.close();
               } catch (Exception ex) {
               LOG.error(
               "Caught exception during connection close",
               ex);
               }
               }
               consumer = null;
               session = null;
               connection = null;
               initialised = false;
               recycle = false;
               }
               try {
               Thread.sleep(1000);
               } catch (InterruptedException ex) {
               LOG.error("Caught exception during sleep");
               }
               }
               }
              
               public void shutdown() {
               LOG.info(id + " is shutting down");
               recycle = true;
               shutdown = true;
               }
               }
              
               class SimpleListener implements MessageListener {
              
               private String id;
              
               public SimpleListener(String id) {
               super();
               this.id = id;
               }
              
               /**
               * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
               */
               public void onMessage(Message message) {
               LOG.info(id + " received message");
               }
              
               }
              
               private static final Log LOG = LogFactory
               .getLog(MultipleServerReconnectTest.class);
              
               public static void main(String[] args) {
               MultipleServerReconnectTest test = new MultipleServerReconnectTest();
              
               try {
               test.start();
               } catch (Throwable ex) {
               LOG.error("Caught exception in main", ex);
               }
               }
              
               private void start() throws Exception {
               /*
               * If you want to run the following test case under ActiveMQ 3.2.1 then
               * only the following properties are required:
               *
               * properties1.put(Context.INITIAL_CONTEXT_FACTORY,
               * "org.activemq.jndi.ActiveMQInitialContextFactory");
               * properties1.put(Context.PROVIDER_URL, "tcp://localhost:61616");
               * properties2.put(Context.INITIAL_CONTEXT_FACTORY,
               * "org.activemq.jndi.ActiveMQInitialContextFactory");
               * properties2.put(Context.PROVIDER_URL, "tcp://localhost:61617");
               *
               * For ActiveMQ 4.1.0 the required context factory is
               * org.apache.activemq.jndi.ActiveMQInitialContextFactory
               */
               // Setup connection 1
               Hashtable properties1 = new Hashtable();
               properties1.put(Context.INITIAL_CONTEXT_FACTORY,
               "org.jnp.interfaces.NamingContextFactory");
               properties1.put(Context.URL_PKG_PREFIXES,
               "org.jboss.naming:org.jnp.interfaces");
               properties1.put(Context.PROVIDER_URL, "jnp://localhost:1099");
               properties1.put(Context.SECURITY_PRINCIPAL, "admin");
               properties1.put(Context.SECURITY_CREDENTIALS, "admin");
              
               // Setup connection 2
               Hashtable properties2 = new Hashtable();
               properties2.put(Context.INITIAL_CONTEXT_FACTORY,
               "org.jnp.interfaces.NamingContextFactory");
               properties2.put(Context.URL_PKG_PREFIXES,
               "org.jboss.naming:org.jnp.interfaces");
               // change the following url to point to your second jboss instance
               properties2.put(Context.PROVIDER_URL, "jnp://otherhost:1099");
               properties2.put(Context.SECURITY_PRINCIPAL, "admin");
               properties2.put(Context.SECURITY_CREDENTIALS, "admin");
              
               ConnectionFactory connectionFactory1 = null;
               Queue queue1 = null;
               Context context1 = null;
              
               context1 = new InitialContext(properties1);
               connectionFactory1 = (ConnectionFactory) context1
               .lookup("ConnectionFactory");
               // Make sure this queue has been configured on your jboss server
               // (under ActiveMQ use "dynamicQueues/testQueue")
               queue1 = (Queue) context1.lookup("/queue/tc1_q1");
              
               ConnectionFactory connectionFactory2 = null;
               Queue queue2 = null;
               Context context2 = null;
              
               context2 = new InitialContext(properties2);
               connectionFactory2 = (ConnectionFactory) context2
               .lookup("ConnectionFactory");
               // Make sure this queue has been configured on your jboss server
               // (under ActiveMQ use "dynamicQueues/testQueue")
               queue2 = (Queue) context2.lookup("/queue/tc1_q1");
              
               MessageListener listener1 = new SimpleListener("Listener.1");
               ListenerManagerThread manager1 = new ListenerManagerThread(
               connectionFactory1, queue1, listener1, "ListenerManager.1");
               manager1.start();
              
               DispatcherThread dispatcher1 = new DispatcherThread(connectionFactory1,
               queue1, "Dispatcher.1");
               dispatcher1.start();
              
               MessageListener listener2 = new SimpleListener("Listener.2");
               ListenerManagerThread manager2 = new ListenerManagerThread(
               connectionFactory2, queue2, listener2, "ListenerManager.2");
               manager2.start();
              
               DispatcherThread dispatcher2 = new DispatcherThread(connectionFactory2,
               queue2, "Dispatcher.2");
               dispatcher2.start();
              
               // 10 minutes
               Thread.sleep(600000);
              
               manager1.shutdown();
               manager1.join();
              
               dispatcher1.shutdown();
               dispatcher1.join();
              
               manager2.shutdown();
               manager2.join();
              
               dispatcher2.shutdown();
               dispatcher2.join();
              
               context1.close();
               context2.close();
               }
              }
              


              • 19. Re: Connecting to two JBoss messaging servers causes interfe
                timfox

                Ben - can you add a bug report in JIRA?

                BTW 1.2 will be out very soon which will have real transparent failover and clustering.

                1.0 is really only intended as a single server installation - but it's obviously a plus if you can get it to work with more than one server.

                • 20. Re: Connecting to two JBoss messaging servers causes interfe
                  bander
                  1 2 Previous Next