1 2 Previous Next 17 Replies Latest reply on Aug 5, 2008 4:22 AM by timfox

    All messages are not persisted in Database.

    sajankn

      Versions:
      JBoss AS : 4.2.2.GA
      JBM : 1.4.0.SP3
      Java : 1.5.0_13
      OS : Mac OSX Server 10.5.2
      DB : Oracle 10g

      Setup:
      I'm doing the stress test on JBM. The following is the setup:

      AS with JBM - 2 (Both are up and running).
      Publisher - 5 (Each Publishing 1000 messages simultaneously). Runs in an independent JVM.
      Subscriber - 1. Runs in an independent JVM. (Its a Durable Subscriber)

      Scenario:
      I run the Publisher. Total 5000 messages are published by the subscriber. But only 4000 are persisted to Database. So when I run the Subscriber only 400 messages are consumed.

      This happens only during the first run. For the subsequent runs, all the messages are persisted and can be consumed by the subscriber.

      This scenario is repeatable.

      How do I make sure that I do not lose any messages even during the first run.

        • 1. Re: All messages are not persisted in Database.
          ataylor

          Are the messages persistent, if they are they will all appear in the database, if not only paged messages will be persisted.

          Make sure there are no warnings on the server?

          • 2. Re: All messages are not persisted in Database.
            sajankn

            The messages are persisted in the database (Oracle). In the oracle-persistence-service.xml I've turned the UsingBatchUpdates to false.

            <attribute name="UsingBatchUpdates">false</attribute>


            There are no warnings or errors on the server.

            All the 5 publishers are different instance of the same class and they run simultaneously and publish 1000 messages each.

            Except for 1 publisher, all other messages (4000) are persisted to database and is consumed by the subscriber.

            This happens only during the first run of a fresh setup (AS, JBM, DB, Subscriber, Publisher all are running for the first time). In all the subsequent runs no messages are lost.

            Publisher Code:
            String destinationName = "/topic/testTopic";
            InitialContext ic = new InitialContext();
            TopicConnectionFactory cf = (TopicConnectionFactory)ic.lookup("/ClusteredConnectionFactory");
            Topic topic = (Topic)ic.lookup(destinationName);
            TopicConnection connection = cf.createTopicConnection();
            TopicSession session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
            TopicPublisher publisher = session.createPublisher(topic);
            connection.start();
            String msgStr = "Hello World";
            TextMessage tm = session.createTextMessage(msgStr);
            publisher.setTimeToLive(604800000);
            publisher.publish(tm);
            


            Subscriber Code:
            String destinationName = "/topic/testTopic";
            InitialContext ic = new InitialContext();
            TopicConnectionFactory cf = (TopicConnectionFactory)ic.lookup("/ClusteredConnectionFactory");
            Topic topic = (Topic)ic.lookup(destinationName);
            TopicConnection connection = cf.createTopicConnection("admin", "admin");
            connection.setClientID("MyClientID");
            TopicSession session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
            session.createDurableSubscriber(topic, "Alpha");
            TopicSubscriber subscriber = session.createDurableSubscriber(topic, "Alpha");
            connection.start();
            


            In my actual code I've done all the exception handling.

            • 3. Re: All messages are not persisted in Database.
              timfox

              Probably you aren't creating your durable subscriber until after some of the messages have been sent, so clearly those messages will be unavailable to consume.

              This would also explain why it only happens on the first run, since on subsequent runs the durable subscriber already exists.

              • 4. Re: All messages are not persisted in Database.
                sajankn

                The durable subscribers are created before the messages are sent. First I create the durable subscriber and then only start the publisher.

                There are 5 publishers which run concurrently. Most of the time I lose the all messages published by only 1 Publisher. But there are instances where I've lost all messages of more than 1 Publisher. In one instance I lost all messages of 3 Publishers.

                Is there any workaround for this.

                • 5. Re: All messages are not persisted in Database.
                  ataylor

                  it's unlikely that this is actually an issue, its more likely configuration. If you provide a test case demonstrating the problem i'll take a look.

                  • 6. Re: All messages are not persisted in Database.
                    sajankn

                    The following is the set up and steps I did to get this issue.

                    Setup:

                    JBoss/JBM Servers - 2 Nos. S1 & S2.
                    Both are fresh servers in which the JBoss MQ is replaced with JBM and all the necessary configuration changes made as per the documentation.

                    Publisher - 5 Nos. P1, P2, P3, P4 & P5.
                    All the publishers are the instance of the same class. All are publishing on testTopic. Each Publisher publishes about 1000 Messages.

                    Subscriber - 1 Nos. Sub1.
                    Durable Subscribing to testTopic.

                    Persistence - Oracle DB.

                    Steps:
                    1. Start both the Servers (S1 & S2).
                    2. Start the Subscriber.
                    3. Check the JBM_POSTOFFICE table. The QueueName "MyClientID.Alpha" is tied to the COND "topic.testTopic".
                    4. Stop the Subscriber.
                    5. Run all the 5 Publishers together till they publish all the 1000 messages.
                    6. Let the publishers complete the publishing.
                    7. Check the JBM_MSG table. There are only 4000 messages. Ideally there should be 5000 messages.
                    8. Run the subscriber.
                    9. Subscriber consumes 4000 messages.
                    10. Let the Subscriber consume all the messages.
                    11. Check the JBM_MSG table. There are 0 messages.

                    I lose all the messages of 1 publisher.

                    • 7. Re: All messages are not persisted in Database.
                      timfox

                      Please post a simple Java test that we can run that demonstrates the problem.

                      • 8. Re: All messages are not persisted in Database.
                        sajankn

                        I'm not able to attach the files, hence putting the code as such.

                        Publisher Code:

                        import javax.jms.Connection;
                        import javax.jms.ConnectionFactory;
                        import javax.jms.DeliveryMode;
                        import javax.jms.JMSException;
                        import javax.jms.MessageProducer;
                        import javax.jms.Session;
                        import javax.jms.TextMessage;
                        import javax.jms.Topic;
                        import javax.jms.TopicConnection;
                        import javax.jms.TopicConnectionFactory;
                        import javax.jms.TopicPublisher;
                        import javax.jms.TopicSession;
                        
                        import javax.naming.InitialContext;
                        import javax.naming.NamingException;
                        
                        public class MyPublisher
                        {
                         public static void main(String[] args)
                         {
                         try
                         {
                         String destinationName = "/topic/testTopic";
                         InitialContext ic = new InitialContext();
                         TopicConnectionFactory cf = (TopicConnectionFactory)ic.lookup("/ClusteredConnectionFactory");
                         Topic topic = (Topic)ic.lookup(destinationName);
                         TopicConnection connection = cf.createTopicConnection();
                         TopicSession session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
                         TopicPublisher publisher = session.createPublisher(topic);
                         connection.start();
                         String msgStr = "Hello World";
                         for (int i = 0; i< 1000; i++)
                         {
                         TextMessage tm = session.createTextMessage(msgStr + i);
                         publisher.publish(tm);
                         }
                         }
                         catch(JMSException jmse)
                         {
                         jmse.printStackTrace();
                         }
                         catch(NamingException ne)
                         {
                         ne.printStackTrace();
                         }
                         catch(Exception e)
                         {
                         e.printStackTrace();
                         }
                         }
                        }
                        


                        Run.sh for Publisher:
                        java -Dcom.sun.management.jmxremote.port=12346 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.naming.factory.initial=org.jboss.naming.NamingContextFactory -Djava.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces -Djava.naming.provider.url=jnp://MY_SERVER:PORT_NO -DespStress-Xms256m -Xmx512m -Dsun.rmi.dgc.client.gcInterval=360000 -Dsun.rmi.dgc.server.gcInterval=360000 -Xloggc:gc.log -classpath ${CLIENT_CLASSPATH} MyPublisher
                        


                        Subscriber Code:
                        import javax.jms.JMSException;
                        import javax.jms.TextMessage;
                        import javax.jms.Topic;
                        import javax.jms.TopicConnection;
                        import javax.jms.TopicConnectionFactory;
                        import javax.jms.TopicSession;
                        import javax.jms.TopicSubscriber;
                        import javax.naming.InitialContext;
                        import javax.naming.NamingException;
                        
                        public class MySubscriber
                        {
                         public static void main(String[] args)
                         {
                         try
                         {
                         String destinationName = "/topic/testTopic";
                         InitialContext ic = new InitialContext();
                         TopicConnectionFactory cf = (TopicConnectionFactory)ic.lookup("/ClusteredConnectionFactory");
                         Topic topic = (Topic)ic.lookup(destinationName);
                         TopicConnection connection = cf.createTopicConnection("admin", "admin");
                         connection.setClientID("MyClientID");
                         TopicSession session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
                         TopicSubscriber subscriber = session.createDurableSubscriber(topic, "MyName");
                         connection.start();
                         while (true)
                         {
                         TextMessage message = (TextMessage)subscriber.receive(5000);
                         if (message != null)
                         {
                         String msgStr = message.getText();
                         System.out.println(msgStr);
                         }
                         message = null;
                         }
                         }
                         catch(JMSException jmse)
                         {
                         jmse.printStackTrace();
                         }
                         catch(NamingException ne)
                         {
                         ne.printStackTrace();
                         }
                         catch(Exception e)
                         {
                         e.printStackTrace();
                         }
                         }
                        }
                        


                        Run.sh for Subscriber:
                        java -Dcom.sun.management.jmxremote.port=12346 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.naming.factory.initial=org.jboss.naming.NamingContextFactory -Djava.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces -Djava.naming.provider.url=jnp://MY_SERVER:PORT_NO -DespStress-Xms256m -Xmx512m -Dsun.rmi.dgc.client.gcInterval=360000 -Dsun.rmi.dgc.server.gcInterval=360000 -Xloggc:gc.log -classpath ${CLIENT_CLASSPATH} MySubscriber
                        


                        Currently I'm doing a work-around for the messages not persisted in DB. For every new topic, I publish a few test messages and subscribe them. After the first time, the messages are always persisted, hence I dont lose any message.

                        Also the Subscriber sometime do not consume messages even though they are present in the database and require restart (some time more than once) of the subscriber.

                        • 9. Re: All messages are not persisted in Database.
                          timfox

                          Ok, you're almost there.

                          You have two programs now. You just need to combine them into a single test that we can run to show your problem.

                          • 10. Re: All messages are not persisted in Database.
                            clebert.suconic

                             

                            "sajankn" wrote:
                            I'm not able to attach the files, hence putting the code as such.




                            I don't see you calling setDeliveryMode(DeliveryMode.PERSISTENT) anywhere in your code.

                            http://java.sun.com/javaee/5/docs/api/javax/jms/MessageProducer.html#setDeliveryMode(int)

                            As you didn't call setDeliveryMode(DeliveryMode.PERSISTENT) those message aren't even supposed to be persisted. Some persistence probably happened because of a collateral effect of paging where we send messages to the DB when you have too many messages on the queues.

                            • 11. Re: All messages are not persisted in Database.
                              sajankn

                              As per the documentation of JMS, the default value is Persistent.

                              "Delivery mode is set to PERSISTENT by default."

                              Do I explicitly set the value to setDeliveryMode(DeliveryMode.PERSISTENT).

                              Currently the messages are persisted into the Oracle DB.

                              • 12. Re: All messages are not persisted in Database.
                                timfox

                                Yes, persistent is the default. But going back to my original request, please provide a simple test program which demonstrates the problem.

                                • 13. Re: All messages are not persisted in Database.
                                  timfox

                                  If the problem is real, you should be able to easily provide a simple test case to run that demonstrates it.

                                  • 14. Re: All messages are not persisted in Database.
                                    sajankn

                                    I tried to merge both subscriber and publisher code into a single file, but I'm not able to reproduce the same situation for this single file. It can be reproduced only if both run in separate jvm and in a clustered mode.

                                    import javax.jms.JMSException;
                                    import javax.jms.TextMessage;
                                    import javax.jms.Topic;
                                    import javax.jms.TopicConnection;
                                    import javax.jms.TopicConnectionFactory;
                                    import javax.jms.TopicPublisher;
                                    import javax.jms.TopicSession;
                                    import javax.jms.TopicSubscriber;
                                    import javax.naming.InitialContext;
                                    import javax.naming.NamingException;
                                    
                                    public class MyTest
                                    {
                                     private String destinationName;
                                     private TopicConnectionFactory cf;
                                     private Topic topic;
                                     private Topic [] topicArr;
                                     private TopicConnection connection;
                                     private TopicSession session;
                                     private TopicSession sessionPub;
                                     private TopicSubscriber subscriber;
                                     private TopicPublisher [] publisher;
                                     private int publisherCount = 5;
                                    
                                     public static void main(String[] args)
                                     {
                                     MyTest obj = new MyTest();
                                     obj.init();
                                     obj.subscribe();
                                     obj.publish();
                                     //obj.cleanup();
                                     }
                                    
                                     public void init()
                                     {
                                     try
                                     {
                                     destinationName = "/topic/testTopic";
                                     InitialContext ic = new InitialContext();
                                     cf = (TopicConnectionFactory)ic.lookup("/ClusteredConnectionFactory");
                                     topic = (Topic)ic.lookup(destinationName);
                                     connection = cf.createTopicConnection("admin", "admin");
                                     connection.setClientID("MyClientID");
                                     session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
                                     sessionPub = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
                                     subscriber = session.createDurableSubscriber(topic, "MyName");
                                     publisher = new TopicPublisher[publisherCount];
                                     topicArr = new Topic[publisherCount];
                                     for (int j=0; j<publisherCount; j++)
                                     {
                                     topicArr[j] = (Topic)ic.lookup(destinationName);
                                     publisher[j] = sessionPub.createPublisher(topicArr[j]);
                                     }
                                     ic = null;
                                     connection.start();
                                     }
                                     catch(JMSException jmse)
                                     {
                                     jmse.printStackTrace();
                                     }
                                     catch(NamingException ne)
                                     {
                                     ne.printStackTrace();
                                     }
                                     catch(Exception e)
                                     {
                                     e.printStackTrace();
                                     }
                                     }
                                    
                                     private class SubscriberMessages extends Thread
                                     {
                                     public void run()
                                     {
                                     try
                                     {
                                     while (true)
                                     {
                                     TextMessage message = (TextMessage)subscriber.receive(5000);
                                     if (message != null)
                                     {
                                     String msgStr = message.getText();
                                     System.out.println(msgStr);
                                     }
                                     message = null;
                                     }
                                     }
                                     catch(JMSException jmse)
                                     {
                                     jmse.printStackTrace();
                                     }
                                     catch(Exception e)
                                     {
                                     e.printStackTrace();
                                     }
                                     }
                                     }
                                    
                                     public void subscribe()
                                     {
                                     SubscriberMessages sObj = new SubscriberMessages();
                                     sObj.start();
                                     }
                                    
                                     private class PublishMessages extends Thread
                                     {
                                     private int pubID = 0;
                                     public PublishMessages(int id)
                                     {
                                     pubID = id;
                                     }
                                     public void run()
                                     {
                                     try
                                     {
                                     String msgStr = "Hello World";
                                     for (int i = 0; i< 1000; i++)
                                     {
                                     TextMessage tm = session.createTextMessage(msgStr + i);
                                     publisher[pubID].publish(tm);
                                     }
                                     }
                                     catch(JMSException jmse)
                                     {
                                     jmse.printStackTrace();
                                     }
                                     catch(Exception e)
                                     {
                                     e.printStackTrace();
                                     }
                                     }
                                     }
                                    
                                     public void publish()
                                     {
                                     for (int i=0; i<publisherCount; i++)
                                     {
                                     PublishMessages pObj = new PublishMessages(i);
                                     pObj.start();
                                     }
                                    
                                     }
                                    
                                     public void cleanup()
                                     {
                                     try
                                     {
                                     try
                                     {
                                     if (subscriber != null)
                                     subscriber.close();
                                     }
                                     catch(Exception e)
                                     {
                                     }
                                     try
                                     {
                                     for (int j=0; j<publisherCount; j++)
                                     {
                                     if (publisher[j] != null)
                                     publisher[j].close();
                                     }
                                     }
                                     catch(Exception e)
                                     {
                                     }
                                     try
                                     {
                                     if (connection != null)
                                     connection.stop();
                                     }
                                     catch(Exception e)
                                     {
                                     }
                                     try
                                     {
                                     if (session != null)
                                     session.close();
                                     }
                                     catch(Exception e)
                                     {
                                     }
                                     try
                                     {
                                     if (connection != null)
                                     connection.close();
                                     }
                                     catch(Exception e)
                                     {
                                     }
                                     }
                                    
                                     catch(Exception e)
                                     {
                                     e.printStackTrace();
                                     }
                                     }
                                    }
                                    


                                    1 2 Previous Next