2 Replies Latest reply on Mar 28, 2011 9:54 AM by Bob Muller

    Only one subscriber sees a topic message in a clustered topic setup

    Bob Muller Novice

      I have created a straightforward Topic setup on a two-node JBoss 5.1 cluster (using the all server with Messaging defaults). Here's the topic configuration I added in destinations-service.xml on all nodes:

       

         <mbean
            code="org.jboss.jms.server.destination.TopicService"
            name="jboss.messaging.destination:service=Topic,name=PoesysCacheDelete"
            xmbean-dd="xmdesc/Topic-xmbean.xml">
            <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
            <depends>jboss.messaging:service=PostOffice</depends>
            <attribute name="Clustered">true</attribute>
         </mbean>
      

       

      Our web app has a simple pojo cache (a Java Map) that holds data objects queried from a database. Each node has a separate singleton cache. I am using messaging to have the cache on each node remove an object on demand. So the behavior I want is:

       

      1. The user updates an object.
      2. The database transaction happens.
      3. The data access code sends a message to the topic to remove the object from the cache (the payload is the key in the Map).
      4. The registered listeners (MessageListener implementations) in the instance of the web app running on each node receive the published topic message and respond by removing the cached object as requested.

       

      This works fine as long as I have only one node; I'm logging all this and I see the message sent and received and the cache deleted.

       

      However, when I have two nodes, everything stops working. Looking at the logs, it appears that when a node receives the message and acts on it, the other node(s) don't ever see that message. Since I want the action to happen on all nodes, this means the cache operation(s) don't happen on all the nodes. In particular, the sticky-session setup means that the originating/publishing node only sees some of the messages and may or may not remove the object from its own cache, which then causes the next page to display old data.

       

      This looks an awful lot like a race condition between the nodes. My understanding was (and seems to be verified by looking through the topics on this discussion forum) that a clustered topic means that ALL listeners get the message. I can verify that isn't true here--the instant a message is received on one node, the other node(s) don't see it.

       

      I did verify that the mbean is clustered by examining it in the jmx-console for the nodes.

       

      This is the Java code that implements MessageListener:

       

       

      /**
       * A thread-based class that listens for messages about the Poesys/DB cache.
       * 
       * @author Robert J. Muller
       */
      public class CacheMessageListener implements Runnable, MessageListener {
      
        /**
         * Logger for this class
         */
        private static final Logger logger =
          Logger.getLogger(CacheMessageListener.class);
      
        private static final String LISTENER_MSG =
          "com.poesys.db.dao.msg.listener_problem";
        private static final String DELETE_MSG =
          "com.poesys.db.dao.msg.delete_problem";
        private static final String INTERRUPT_MSG =
          "com.poesys.db.dao.msg.interrupted";
      
        /** JMS topic name for the Poesys/DB delete topic */
        public static final String DELETE_TOPIC = "topic/PoesysCacheDelete";
        /** JMS connection factory name */
        public static final String CONNECTION_FACTORY = "ClusteredConnectionFactory";
        /** JMS ObjectMessage property name for cache name property */
        public static final String CACHE_NAME_PROPERTY = "CacheName";
      
        private Connection connection;
        private Session sessionConsumer;
        private MessageConsumer consumer;
      
        /**
         * Runs the message listener.
         */
        public void run() {
          try {
            // Look up the connection factory using JNDI.
            Context initial = new InitialContext();
            ConnectionFactory cf =
              (ConnectionFactory)initial.lookup(CONNECTION_FACTORY);
      
            // Set this object to be a message listener for delete requests.
            Destination deleteTopic = (Destination)initial.lookup(DELETE_TOPIC);
            connection = cf.createConnection();
            sessionConsumer =
              connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            consumer = sessionConsumer.createConsumer(deleteTopic);
            consumer.setMessageListener(this);
            connection.start();
      
            logger.info("Cache message listener started, listening for cache removal requests");
      
            // Sleep indefinitely until interruption.
            while (!Thread.currentThread().isInterrupted()) {
              // Sleeps for 10 seconds
              Thread.sleep(10 * 1000);
            }
          } catch (InterruptedException e) {
            String message = com.poesys.db.Message.getMessage(INTERRUPT_MSG, null);
            logger.info(message);
          } catch (Exception e) {
            String message = com.poesys.db.Message.getMessage(LISTENER_MSG, null);
            logger.error(message, e);
          } finally {
            if (connection != null) {
              try {
                connection.close();
              } catch (JMSException e) {
                String message = com.poesys.db.Message.getMessage(LISTENER_MSG, null);
                logger.error(message, e);
              }
            }
          }
        }
      
        /*
         * (non-Javadoc)
         * 
         * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
         */
        @Override
        public void onMessage(Message message) {
          IPrimaryKey key = null;
          String cacheName = null;
      
          if (message == null) {
            logger.error("Cache message listener received null message");
          } else {
            try {
              logger.debug("Received cache removal request");
              // Get the message and extract the key and the cache name.
              ObjectMessage objectMessage = (ObjectMessage)message;
              if (objectMessage != null) {
                // Message key is the object payload.
                Serializable object = objectMessage.getObject();
                if (object instanceof com.poesys.ms.pk.IPrimaryKey) {
                  com.poesys.ms.pk.IPrimaryKey messageKey = 
                    (com.poesys.ms.pk.IPrimaryKey)objectMessage.getObject();
                  // Translate into database primary key.
                  key = MessageKeyFactory.getKey(messageKey);
                  // Cache name is a property.
                  cacheName = objectMessage.getStringProperty(CACHE_NAME_PROPERTY);
                  IDtoCache<?> cache = DaoManager.getCache(cacheName);
                  // Remove the object from the local cache only if it's there; if
                  // it's not there, move on since there's nothing to do.
                  if (cache != null) {
                    logger.debug("Removing key " + key.getValueList() + " from cache "
                                 + cacheName);
                    cache.removeLocally(key);
                  } else {
                    logger.debug("No cache from which to remove object");
                  }
                } else {
                  logger.error("Cache message listener received message with a payload that was not a primary key");
                }
              }
            } catch (JMSException e) {
              // log full information and ignore
              Object[] objects = { cacheName, key.getValueList() };
              String errorMsg = com.poesys.db.Message.getMessage(DELETE_MSG, objects);
              logger.error(errorMsg, e);
            } catch (RuntimeException e) {
              // log and ignore
              logger.error("Runtime exception in onMessage: ", e);
            }
          }
        }
      }
      

       

      This is the Java code that sends the message:

       

       

        @Override
        public void remove(IPrimaryKey key) {
          // Send a message to listeners asking to remove there. This will remove
          // the object from all listening caches with the cache name of this cache,
          // including THIS one.
          Connection connection = null;
          try {
            Context initial = new InitialContext();
            ConnectionFactory cf =
              (ConnectionFactory)initial.lookup(CacheMessageListener.CONNECTION_FACTORY);
            Destination deleteTopic =
              (Destination)initial.lookup(CacheMessageListener.DELETE_TOPIC);
            connection = cf.createConnection();
            Session session =
              connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer producer = session.createProducer(deleteTopic);
            connection.start();
            ObjectMessage om = session.createObjectMessage(key.getMessageObject());
            om.setStringProperty(CacheMessageListener.CACHE_NAME_PROPERTY,
                                 getCacheName());
            producer.send(om);
            logger.debug("Sent message to remove " + key.getValueList()
                         + " from cache " + getCacheName());
          } catch (Exception e) {
            Object[] objects = { getCacheName() };
            String message = com.poesys.db.Message.getMessage(PRODUCER_MSG, objects);
            logger.error(message, e);
          } finally {
            if (connection != null) {
              try {
                connection.close();
              } catch (JMSException e) {
                Object[] objects = { getCacheName() };
                String message =
                  com.poesys.db.Message.getMessage(PRODUCER_MSG, objects);
                logger.error(message, e);
              }
            }
          }
        }
      
        • 1. Only one subscriber sees a topic message in a clustered topic setup
          Anders Welen Apprentice

          Have you tried adding "<clustered>true</clustered" in you Topic definition?

          • 2. Re: Only one subscriber sees a topic message in a clustered topic setup
            Bob Muller Novice

            I'm not sure, looking through the docs, how to specify this. Here's the example from the examples directory:

             

            <mbean code="org.jboss.jms.server.destination.TopicService" name="jboss.messaging.destination:service=Topic,name=testDistributedTopic" xmbean-dd="xmdesc/Topic-xmbean.xml">
                <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
                <depends>jboss.messaging:service=PostOffice</depends>
                <attribute name="Clustered">true</attribute>
            </mbean>
            

             

            This specifies an <attribute> element, which is what I have in my topic destination. I see I did not put the correct topic in the original post, I'll correct that. Here's the actual one I have on both nodes, note the "Clustered" attribute.

             

             

               <mbean
                  code="org.jboss.jms.server.destination.TopicService"
                  name="jboss.messaging.destination:service=Topic,name=PoesysCacheDelete"
                  xmbean-dd="xmdesc/Topic-xmbean.xml">
                  <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
                  <depends>jboss.messaging:service=PostOffice</depends>
                  <attribute name="Clustered">true</attribute>
               </mbean>
            

             

            If this syntax isn't correct, where do you put the attribute? Or is it an element, as you have specified it? I don't see that in the docs at all.