3 Replies Latest reply on Oct 27, 2011 5:42 PM by Abhijith P

    Problem with durable subscribers on clustered JBoss 5.1.

    Maciej Skorupka Newbie

      Hi,

       

      I divided this question into two versions - shorter and in-depth one, to make this more clear.

       

      SHORTER VERSION:

      I have 2 nodes in cluster (JBoss 5.1) with all configuration. I have this cluster as JMS server with persistence on MySQL 5.1. I have a clustered topic that is described in destinations-service.xml. I've done everything (at least I think so) to make this work as intended, but there's a problem in couple scenarios.

       

      1) Let's assume that we have two nodes running. There's a topic, but there are no messages and durable subscribers. Node1 stops (Ctrl+c or kill). Now we start Java client with receive() (table below for Client), so durable subscriber is created on Node2. Now we stop node2 (Ctrl+c or kill) and start node1 after that. I don't see that durable subscriber using admin-console of node1. If now I use Java client for sending messages, JMS server consumes them, but there's no durable subscriber, so they aren't stored. If I use receive() version, durable subscriber is created, but some messages may be lost because it may happen after some send() invocations.

       

      2) Another scenario is nearly the same as the above one. After node1 gets up, there's no receive() or send() and when node2 gets up, durable subscriber shows up.

       

      3) Another scenario is like 1), but not only durable subscriber is created. There are also messages sent to  that topic. They are stored in database, because receive() wasn't invoked. Node2 stops. Node1 gets up and there are no durable subscribers or messages. If node2 gets up, messages and subscribers come back.

       

      It seems that this shouldn't work this way. Do you have any thoughts?

       

      Thanks in advance.

       

      IN-DEPTH VERSION:

       

      I'd like to ask a question about what happens with JMS durable subscribers on clustered JBoss 5.1, in case when both nodes (both being part of a two-node cluster and let's call them node1 and node2) were killed or closed (Ctrl+c).

       

      First of all, we have two machines and each of them runs JBoss 5.1 server in clustered mode (all configuration). Persistence uses MySQL 5.1. We want to have a JMS server to run on this configuration. And nothing more on those machines. We've copied mysql-persistence-service.xml and deleted hsdbql-persistence-service.xml that was there. We've edited mysql-persistence-service.xml to have this edited for PostOffice:

       

           <attribute name="Clustered">true</attribute>

           <attribute name="FailoverOnNodeLeave">true</attribute>

       

      and this for JMSUserManager:

       

           POPULATE.TABLES.14 = INSERT INTO JBM_USER (USER_ID, PASSWD, CLIENTID) VALUES ('prm-user','password', 'prm-user')
           POPULATE.TABLES.15 = INSERT INTO JBM_USER (USER_ID, PASSWD) VALUES ('ecm-user','password')
           POPULATE.TABLES.16 = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('prm-role','prm-user')
           POPULATE.TABLES.17 = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('ecm-role','ecm-user')

       

       

      Moreover, we've copied mysql-ds.xml from SERVER_PATH/docs/examples/jca/ to SERVER_PATH/server/all/deploy/ and deleted hsqldb-ds.xml from the second path. We've changed that file like this:

       

          <jndi-name>DefaultDS</jndi-name>
          <connection-url>jdbc:mysql://192.168.107.51:3306/prm</connection-url>
          <driver-class>com.mysql.jdbc.Driver</driver-class>
          <user-name>prm</user-name>
          <password>password</password>

       

      So it's time for creating a durable topic on the cluster. We've edited SERVER_PATH/server/all/deploy/messaging/destinations-service.xml by adding this:

       

           <mbean code="org.jboss.jms.server.destination.TopicService" name="jboss.messaging.destination:service=Topic,name=ECM-PRM-Topic" xmbean-dd="xmdesc/Topic-xmbean.xml">
               <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
               <depends>jboss.messaging:service=PostOffice</depends>
               <attribute name="Clustered">true</attribute>
               <attribute name="SecurityConfig">
                   <security>
                       <role name="ecm-role" write="true" />
                       <role name="prm-role" read="true" create="true" />
                   </security>
               </attribute>
           </mbean>

       

      So, now we have a clustered topic with roles and previledges. We've also added JDBC driver for MySQL to libs and we think that's all that has to be done to have persistent topic that runs on a cluster and has a capability to have durable subscribers. So that's the moment when all is set and we can run it.

       

      We start both nodes using this command ($node-ip and $node-id are IP address and node ID of each node):

       

      ./run.sh -c all -g PartitionName -b $node-ip -Djboss.messaging.ServerPeerID=$node-id

       

      They both see each other and both are capable of reading messages and accepting durable subscribers. Both get messages, store them in the database etc. We also have a code that sends/reads messages to/from that topic. We use J2SE 1.6.16 and jars from SERVER_PATH/client/. It sends messages to JBoss and receives from it using durable subscribtion. Sorry for the comments, but we sometimes use it for sending and sometimes for receiving. Of course this client isn't doing anything useful. It's just for test purposes.

       

       

      Client

      package test1;

       

      import java.util.Date;
      import java.util.Properties;
      import java.util.logging.Logger;

       

      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.DeliveryMode;
      import javax.jms.Destination;
      import javax.jms.MessageConsumer;
      import javax.jms.MessageProducer;
      import javax.jms.Session;
      import javax.jms.TextMessage;
      import javax.jms.Topic;
      import javax.naming.Context;
      import javax.naming.InitialContext;

       

      public class Main {
         
          public static Logger LOGGER = Logger.getLogger(Main.class.getName());
         
          public static void send(ConnectionFactory cf, Destination target) throws Exception {
              Connection connection = null;
              try {
                  connection = cf.createConnection("ecm-user", "password");
                  Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                  MessageProducer producer = session.createProducer(target);
                  connection.start();
                  LOGGER.info("Sending");
                  TextMessage message = session.createTextMessage("This is a text message " + new Date());
                  message.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
                  producer.send(message);
                  LOGGER.info("Sent message: " + message.getText() + "\n" + message.toString());
              }
              finally {
                  if(connection != null) {
                      connection.close();
                  }
              }
          }

       

          public static void receive(ConnectionFactory cf, Destination target) throws Exception {
              Connection connection = null;
              try {
                  connection = cf.createConnection("prm-user", "password");
                  Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                  MessageConsumer messageConsumer = session.createDurableSubscriber((Topic)target, "prm-user");
                  connection.start();
                  LOGGER.info("Receiving");
                  TextMessage messageReceived = (TextMessage)messageConsumer.receive(10000);
                  if(messageReceived == null) {
                      LOGGER.info("Receive timeout");
                  }
                  else {
                      LOGGER.info("Received message: " + messageReceived.getText() + "\n" + messageReceived.toString());
                      messageReceived.acknowledge();
                  }
              }
              finally {
                  if(connection != null) {
                      connection.close();
                  }
              }
          }

       

          public static void main(String[] args) throws Exception {
              InitialContext initialContext = null;
              try {
                  LOGGER.info("LOOKUP");
                 
                  Properties p = new Properties();
                  p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
                  p.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
                  p.put(Context.PROVIDER_URL, "jnp://192.168.107.68:1099,192.168.107.51:1099");

       

                  // Step 1. Create an initial context to perform the JNDI lookup.
                  initialContext = new InitialContext(p);
                  // Step 3. Perform a lookup on the Connection Factory
                  ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ClusteredConnectionFactory");
                  // Step 2. Perfom a lookup on the queue
                  Destination target = (Destination)initialContext.lookup("/topic/ECM-PRM-Topic");

       

                  LOGGER.info("LOOKUP done");
                             
      //            send(cf, target);
                  receive(cf, target);

       

                  LOGGER.info("DONE");
                 
                  return;
              }
              finally {
                  if(initialContext != null) {
                      initialContext.close();
                  }
              }
          }

       

      }

       

       

      So let's go back to the main problem. We use above mentioned code to (send() commented out, so receive() is used) receive from the topic. This creates durable subscriber on server (we see that using admin-console on each node) and send(). send() version sends messages and they are stored, so that receive() can read them in the future. So above code seems OK. But there are scenarios that break this.

       

      1) Let's assume that we have two nodes running. There's a topic, but there are no messages and durable subscribers. Node1 stops (Ctrl+c or kill). Now we start Java client with receive(), so durable subscriber is created on Node2. Now we stop node2 (Ctrl+c or kill) and start node1 after that. I don't see that durable subscriber using admin-console of node1. If now I use Java client for sending messages, JMS server consumes them, but there's no durable subscriber, so they aren't stored. If I use receive() version, durable subscriber is created, but some messages may be lost because it may happen after some send() invocations.

       

      2) Another scenario is nearly the same as the above one. After node1 gets up, there's no receive() or send() and when node2 gets up, durable subscriber shows up.

       

      3) Another scenario is like 1), but not only durable subscriber is created. There are also messages sent to  that topic. They are stored in database, because receive() wasn't invoked. Node2 stops. Node1 gets up and there are no durable subscribers or messages. If node2 gets up, messages and subscribers come back.

       

       

      There are probably more scenarios, but I don't want this thread to be longer than it is already.

       

      Maybe we're doing something wrong way. Maybe there's more configuration to do. Maybe this is expected behavior. We don't know and we really need advice with this.

       

      Thanks in advance.