3 Replies Latest reply on Aug 18, 2009 11:28 PM by mayankmit2002

    Receiving Messages from multiple nodes in a Cluster

    mayankmit2002

      My application is that, in which I've a session bean which is called by the client and an event is fired from that bean on a topic. My JMS remote client is listening to that topic and performs some operations on the type of event fired.
      The whole solution works fine if we have a single node in the cluster, but problem occurs when we have multiple nodes in the cluster. The scenario is :
      1. Multiple nodes in the cluster, NOICLT22560 and NOICLT13294 and partition name is NOICLT22560_PARTITION.
      2. The session bean responsible for firing the event is clustered and is firing the events on the topic using JmsXA.

      Being clustered, the call from the client can be executed on any of the node and thus, event can be fired from any node.
      But now, what I want is a type of client that can bound to the partition name, instead of the node name to receive events from any of the node in the cluster.
      I tried a lot, but my client gets connected to any one of the node and listen events from that node only.

      Here is my client code:

      
      package com.abc.jbmtest.client;
      
      
      import java.util.Properties;
      
      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.ExceptionListener;
      import javax.jms.JMSException;
      import javax.jms.Message;
      import javax.jms.MessageConsumer;
      import javax.jms.MessageListener;
      import javax.jms.ObjectMessage;
      import javax.jms.Session;
      import javax.jms.TextMessage;
      import javax.jms.Topic;
      import javax.naming.CommunicationException;
      import javax.naming.Context;
      import javax.naming.InitialContext;
      import javax.naming.NamingException;
      
      
      public class MessageListenerClient
       implements MessageListener, ExceptionListener
      {
      
      
       public MessageListenerClient()
       {
       final Properties props = new Properties();
       props.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
       props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
       props.put(Context.PROVIDER_URL, "NOICLT13274:1100,NOICLT22560:1100");
      // props.put("jnp.partitionname", "NOICLT22560_PARTITION");
      // props.put(Context.PROVIDER_URL, "jnp://NOICLT13274:1099,jnp://NOICLT22560:1099 ");
      
       try
       {
       final Context context = new InitialContext(props);
       ConnectionFactory mTopicConnectionFactory = (ConnectionFactory) context.lookup("ConnectionFactory");
       System.out.println(mTopicConnectionFactory);
       Topic mTopic = (Topic) context.lookup("topic/CMSPublic");
      
      
       Connection mTopicConnection = mTopicConnectionFactory.createConnection();
       Session mTopicSession = mTopicConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
      
       // Create subscriber
       MessageConsumer mSubscriber = mTopicSession.createConsumer(mTopic);
       mSubscriber.setMessageListener(this);
       mTopicConnection.setExceptionListener(this);
      
       mTopicConnection.start();
       }
       catch (final CommunicationException anException)
       {
       System.err.println("Server is not avaliable!!! \n Trying to reconnect the server!! " + anException.getMessage());
       }
       catch (final NamingException anException)
       {
       System.err.println("Server is not avaliable!!! \n Trying to reconnect the server!! " + anException.getMessage());
       // stopJMSEvents();
       // startJMSEvents();
       }
       catch (final JMSException anException)
       {
       System.err.println("Server is not avaliable!!! \n Trying to reconnect the server!! " + anException.getMessage());
      
       }
      
       catch (Exception finalException)
       {
       System.err.println("Exception while starting JMS event listener. You may not be able to receive event, please restart your application");
       finalException.printStackTrace();
       }
      
      
       }
      
      
      
       @Override
       public void onMessage (Message aMessage)
       {
      
       try
       {
       if (aMessage instanceof TextMessage)
       {
       TextMessage message;
       message = (TextMessage) aMessage;
       System.out.println("Message Reciieved ------ " + message.getText());
       }
       else if (aMessage instanceof ObjectMessage)
       {
       ObjectMessage message = (ObjectMessage) aMessage;
       System.out.println("Message Reciieved ------ " + message.getObject());
       }
       else
       {
       System.out.println("Message Reciieved ------ " + aMessage.getClass());
       }
       }
       catch (Exception exception)
       {
       exception.printStackTrace();
       }
      
       }
      
      
       @Override
       public void onException (JMSException anArg0)
       {
       anArg0.printStackTrace();
      
       }
      
      
       public static void main (String[] args) throws Exception
       {
       new Thread(new Runnable()
       {
       /**
       * @see java.lang.Runnable#run()
       */
       @Override
       public void run ()
       {
       new MessageListenerClient();
      
       }
       }).start();
       System.in.read();
       System.out.println("------------Exiting Client Application-------------------------");
       }
      }