9 Replies Latest reply on Jun 26, 2006 3:41 PM by raghum

    JMS program hang

    raghum

      Hi

      I am using jboss-4.0.4.GA with jboss-messaging-1.0.0.GA on linux/x86 platform and using JMS API. Java is Standard Edition (build 1.5.0_04-b05). The setup is run with default configuration (no changes made). The JBoss messaging server runs on a different machine from the test machine.

      I am using the topic messaging domain for communication. I describe the program scenario below, which is actually a simulation of a real application scenario -

      I have 3 threads executing in the same JVM - Producer, Relayer and Consumer. The Producer sends messages to the Relayer and the Relayer reads and resends the message to the Consumer. All the threads share the same topic but use selector to filter the Producer->Relayer and Relayer->Consumer communication. The threads use separate sessions for each of the communication path. That is, Producer has a session for its MessageProducer; Relayer has one session for its MessageConsumer and another for its MessageProducer; Consumer has its own session for its MessageConsumer (all the sessions are not transacted and use AUTO_ACKNOWLEDGEMENT). The messages sent are TextMessage type and carry a payload of 25 characters and the threads run continously. In the Relayer, once a message is received, it is created again before being relayed.

      What I am noticing is, Producer is able to sent large amount of messages (around 10000, it is the set limit now) while the Relayer is not able to send more 10 messages and the Consumer is not able to consume more than 10 messages. The Relayer hangs while doing 'publish' and the Consumer is hanging in 'receive'. Also,
      * If the Consumer is disabled, then the Relayer is able to consume all the messages without hanging.
      * If the Producer is throttled, say, at one message per sec, still the same behaviour is seen - the Relayer and Consumer hang.

      I want to know if I am using the JMS API correctly and using the JBoss setup correctly. Has anyone faced this problem before or anyone can help out?

      I can send the test code - it is small and can be tried standalone on a JBoss with Messaging setup.

      Thanks
      Raghu

        • 1. Re: JMS program hang
          timfox

          Yes, please post your code.

          • 2. Re: JMS program hang
            raghum

            Here is the code. Four lines are commented out to disable the relay operation in the MyRelayerWithNewSession class. You could enable the relay operation by removing the comments and see the difference in the behaviour.

            Thanks
            Raghu

            ================================

            import javax.jms.*;
            import javax.naming.Context;
            import javax.naming.InitialContext;
            import java.util.Date;
            
            public class TestJMS extends TestCase {
            
             private int numMessages;
             Date date;
            
             static final String nameTopicConnFactory = "java:/XAConnectionFactory";
             static final String nameTopic = "topic/testTopic";
            
             static final int CONSUMER_WAIT_TIME = 0;
             static final int RELAYER_WAIT_TIME = 0;
            
             public TestJMS(String name) {
             super(name);
             }
            
             public void setUp() {
             numMessages = 0;
             }
            
             public void tearDown() {
             }
            
             public void testJMSFastPublisherSlowConsumerWithRelay() throws Exception {
             String testName = "Publish/Subscribe with Slow Consumer and Relayer - AUTO ACK ";
             ConnectionFactory connFactory = null;
             Connection connection = null;
             Session session = null;
             Destination topic = null;
             MyRelayerWithNewSession relayThread = null;
             MyConsumerWithNewSession consumerThread = null;
             WatchdogTimer watchdog = null;
            
             MessageProducer[] producers = new MessageProducer[1];
            
             try {
             InitialContext ic = new InitialContext ();
             //System.out.println ("Created InitialContext :: " + ic);
             String payload = "This is the actual data that we send";
            
             connFactory = (ConnectionFactory) ic.lookup (nameTopicConnFactory);
             topic = (Topic)ic.lookup(nameTopic);
             connection = connFactory.createConnection();
            
             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             producers[0] = session.createProducer(topic);
            
             relayThread = new MyRelayerWithNewSession(connection, topic, "text0", "text1",
             RELAYER_WAIT_TIME, false, payload);
             consumerThread = new MyConsumerWithNewSession(connection, topic, "text1",
             CONSUMER_WAIT_TIME, false, payload);
            
             connection.start();
            
             // watchdog = new WatchdogTimer(producers, consumers, 90000, connection);
            
             //System.out.println("Sender starting");
             try {
             for(int i=0; i<10000; i++) {
             ObjectMessage message = session.createObjectMessage();
             message.setStringProperty("target", "text0");
             message.setObject(payload+i);
             producers[0].send(message);
             if (i%100 == 0) {
             System.out.println("Sent " + i + "messages");
             }
             }
             } catch (Throwable t ) {
             System.out.println("Producer1 send got Error: "+t.getMessage());
             }
             System.out.println("Done with sending");
             Thread.sleep(40000);
             consumerThread.join();
             relayThread.join();
            
             } catch (Throwable t) {
             System.err.println("Error: "+t.getMessage());
             // t.printStackTrace(System.err);
             } finally {
             try {
             if (connection != null){
             connection.close();
             connection = null;
             }
             } catch (JMSException e) {
             e.printStackTrace();
             }
             }
            
            // if (watchdog.isAlive()) {
            // watchdog.interrupt();
            // }
            // watchdog.join();
            
             date = new Date();
             if (!watchdog.interrupted) {
             System.out.println(date.toString()+": "+testName + " : PASSED");
             } else {
             System.out.println(date.toString()+": "+testName + " : FAILED");
             }
             }
            
             class MyRelayerWithNewSession extends Thread {
             Connection connection = null;
             Destination topic = null;
             String localTarget = null;
             String relayTarget = null;
             int delay_ms = 0;
             boolean explicit_ack = false;
             String payload = null;
            
             Session consumeSession = null;
             Session produceSession = null;
            
             MessageConsumer consumer = null;
             MessageProducer producer = null;
            
             MyRelayerWithNewSession(Connection connection, Destination topic,
             String localTarget, String relayTarget, int delay_ms, boolean explicit_ack,
             String payload) {
             this.connection = connection;
             this.topic = topic;
             this.localTarget = localTarget;
             this.relayTarget = relayTarget;
             this.delay_ms = delay_ms;
             this.explicit_ack = explicit_ack;
             this.payload = payload;
             start();
             }
            
             public void run(){
            
             try {
             consumeSession = connection.createSession(false,
             Session.AUTO_ACKNOWLEDGE);
             produceSession = connection.createSession(false,
             Session.AUTO_ACKNOWLEDGE);
            
             consumer = consumeSession.createConsumer(topic, "target='"+localTarget+"'", false);
             producer = produceSession.createProducer(topic);
            
             Thread.sleep(delay_ms);
             System.out.println("Relayer thread waking up");
             for (int i=0; i< 10000; i ++) {
             Message receivedMessage = consumer.receive();
             if (explicit_ack) {
             receivedMessage.acknowledge();
             }
             String receivedPayload = (String) ((ObjectMessage) receivedMessage).getObject();
             // System.out.println("Relay Read at iter "+i+" : " + receivedPayload);
             // assertEquals(true, receivedPayload.equals(payload+i));
            
             // Uncomment these 4 lines to enable relaying of messages
            // ObjectMessage message = produceSession.createObjectMessage();
            // message.setStringProperty("target", relayTarget);
            // message.setObject(receivedPayload);
            // producer.send(message);
            
             if (i%100 == 0) {
             System.out.println("Relayed " + i + "messages");
             }
             }
            
             System.out.println("Done with Relayer thread");
            
             } catch (Throwable t) {
             System.err.println("Relayer Thread encountered Error: "+t.getMessage());
             // t.printStackTrace(System.err);
             }
             }
            
             }
            
             class MyConsumerWithNewSession extends Thread {
             Connection connection = null;
             Destination topic = null;
             String localTarget = null;
             int delay_ms = 0;
             boolean explicit_ack = false;
             String payload = null;
            
             Session consumeSession = null;
             MessageConsumer consumer = null;
            
             MyConsumerWithNewSession(Connection connection, Destination topic,
             String localTarget, int delay_ms, boolean explicit_ack, String payload) {
             this.connection = connection;
             this.topic = topic;
             this.localTarget = localTarget;
             this.delay_ms = delay_ms;
             this.explicit_ack = explicit_ack;
             this.payload = payload;
             start();
             }
            
             public void run(){
             try {
             consumeSession = connection.createSession(false,
             Session.AUTO_ACKNOWLEDGE);
             consumer = consumeSession.createConsumer(topic, "target='"+localTarget+"'", false);
            
             Thread.sleep(delay_ms);
             System.out.println("Receiver waking up");
             for (int i=0; i< 10000; i ++) {
             Message receivedMessage = consumer.receive();
             if (explicit_ack) {
             receivedMessage.acknowledge();
             }
             String receivedPayload = (String) ((ObjectMessage) receivedMessage).getObject();
             System.out.println("Receive Read at iter "+i+" : " + receivedPayload);
             // assertEquals(true, receivedPayload.equals(payload+i));
            
             if (i%100 == 0) {
             System.out.println("Received " + i + "messages");
             }
             }
            
             System.out.println("Done with Consumer thread");
            
             } catch (Throwable t) {
             System.err.println("Receiver Thread encountered Error: "+t.getMessage());
             // t.printStackTrace(System.err);
             }
             }
             }
            
             static class WatchdogTimer extends Thread {
            
             long waitTime = 0L;
             boolean interrupted = false;
             Connection connection = null;
             MessageProducer[] producers = null;
             MessageConsumer [] consumers = null;
            
             public WatchdogTimer (MessageProducer[] producers, MessageConsumer [] consumers,
             long waitTime, Connection connection) {
             this.producers = producers;
             this.consumers = consumers;
             this.waitTime = waitTime;
             this.connection = connection;
             start();
             }
            
             public void run () {
            
             try {
             Thread.sleep(this.waitTime);
             this.interrupted = true;
            
             System.out.println("Watchdog waking up: closing producers and consumers");
             for (int i = 0; i < this.consumers.length; i++) {
             this.consumers.close();
             }
             for (int i = 0; i < this.producers.length; i++) {
             this.producers.close();
             }
            
             connection.close();
             } catch (InterruptedException thrExc) {
             System.out.println("watchdog interrupted");
             } catch (JMSException jmsExc) {
             System.out.println("watchdog got jmsExc");
             }
            
             }
            
             }
            }
            



            • 3. Re: JMS program hang
              timfox

              Looking at your code, there is something I do not understand.

              You say that the messaging server is on one machine, and the producer, relayer and consumer are on another machine.

              However in your code, you are looking up the connection factory using "java:/XAConnectionFactory", which is a *local* jndi reference (any jndi name starting with java: is local to the JVM), so cannot be on another machine.

              Please explain your config and topology in more detail, are the clients deployed in a jboss instance or are they standalone?

              Where are you specifying your JNDI properties on the client machine, I notice you just do "new InitialContext()" with no params.

              • 4. Re: JMS program hang
                raghum

                The messaging server and the client (with its three threads) are running on two different machines. I am passing these two parameters through the 'build.properties' file on the client side.
                ======
                jms.test.java.naming.provider.url = jnp://bhatp-desktop:1099
                jms.test.java.naming.factory.initial = org.jnp.interfaces.NamingContextFactory
                =====
                And I am invoking my test program(client) from a machine *different* from 'bhatp-desktop'.

                I was not aware of the 'java:' part in the jndi reference. But the client was able to connect to the messaging server (and the hang happens in the middle of the execution). Even so, I have changed the jndi reference to '/XAConnectionFactory'. And the behavior remains the same. To reiterate - I have only one instance of messaging server in the whole setup running on 'bhatp-desktop' (killing this server results in 'connection refused' etc exception).

                I guess I am running the setup in a standalone configuration. I invoke the messaging server by running 'run.sh -c messaging' in the jboss-4.0.4.GA/bin directory. And the client is invoked on a different machine with jboss-messaging-client-scoped.jar in its classpath.

                I wish to report one more behavioral observation with respect to the given code. Even if I introduce a 'sleep(1000)' in the producer (main) thread's send loop to basically reduce its send rate, the program hangs after 10-12 iterations - the relayer and the consumer hang in 'receive' calls while the sender makes progress doing sends till 10000 iterations.

                Thanks
                Raghu

                • 5. Re: JMS program hang
                  timfox

                  Please post the actual code you are running so I can replicate this.

                  The previous code has various errors including the initialisation of the InitialContext, the wrong connection factory etc.

                  I'm not sure how I can help you unless you give me a test case I can actually run.

                  Also, please tell me if your clients are deployed in a messaging server or standalone, by standalone clients I mean a standalone jms client program.

                  • 6. Re: JMS program hang
                    timfox

                    I've just noticed you're using 1.0.0 GA, can you upgrade to the latest version?

                    • 7. Re: JMS program hang
                      timfox

                      But post your *actual* code anyway.

                      • 8. Re: JMS program hang
                        timfox

                        There was a bug in 1.0.0 related to delivery to subscriptions with selectors, which was fixed some time ago.

                        http://www.jboss.com/index.html?module=bb&op=viewtopic&t=81506

                        I think your problem could be related.

                        Actually 1.0.0 is pretty old now, there have been several bug fixes since then so best to upgrade.

                        Very soon we should be releasing a 1.0.1 GA.

                        • 9. Re: JMS program hang
                          raghum

                          Sorry for posting this late.

                          Upgrading to CR2 of jboss-messaging solved the hang problem. I am posting the working code anyway.

                          And these variables need to be set in build.properties.

                          jms.test.java.naming.provider.url = jnp://<server name>:1099
                          jms.test.java.naming.factory.initial = org.jnp.interfaces.NamingContextFactory

                          And the jboss client is a standalone program.

                          Thanks
                          Raghu

                          import java.util.Date;
                          
                          import javax.jms.Connection;
                          import javax.jms.ConnectionFactory;
                          import javax.jms.Destination;
                          import javax.jms.JMSException;
                          import javax.jms.Message;
                          import javax.jms.MessageConsumer;
                          import javax.jms.MessageProducer;
                          import javax.jms.ObjectMessage;
                          import javax.jms.Session;
                          import javax.jms.Topic;
                          import javax.naming.InitialContext;
                          
                          
                          import junit.framework.TestCase;
                          
                          public class TestJMS1 extends TestCase {
                          
                           Date date;
                          
                           static final String nameTopicConnFactory = "XAConnectionFactory";
                           static final String nameTopic = "topic/testTopic";
                          
                           static final int CONSUMER_WAIT_TIME = 60000;
                           static final int RELAYER_WAIT_TIME = 30000;
                          
                           public TestJMS1(String name) {
                           super(name);
                           }
                          
                           public void setUp() {
                           }
                          
                           public void tearDown() {
                           }
                          
                           public void testJMSFastPublisherSlowConsumerWithRelay() throws Exception {
                           String testName = "Publish/Subscribe with Slow Consumer and Relayer - AUTO ACK ";
                           ConnectionFactory connFactory = null;
                           Connection connection = null;
                           Session session = null;
                           Destination topic = null;
                           MyRelayerWithNewSession relayThread = null;
                           MyConsumerWithNewSession consumerThread = null;
                           WatchdogTimer watchdog = null;
                          
                           MessageProducer[] producers = new MessageProducer[1];
                          
                           try {
                           InitialContext ic = new InitialContext ();
                           //System.out.println ("Created InitialContext :: " + ic);
                           String payload = "The static portion ";
                          
                           connFactory = (ConnectionFactory) ic.lookup (nameTopicConnFactory);
                           topic = (Topic)ic.lookup(nameTopic);
                           connection = connFactory.createConnection();
                          
                           session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                           producers[0] = session.createProducer(topic);
                          
                           relayThread = new MyRelayerWithNewSession(connection, topic, "text0", "text1",
                           RELAYER_WAIT_TIME, false, payload);
                           consumerThread = new MyConsumerWithNewSession(connection, topic, "text1",
                           CONSUMER_WAIT_TIME, false, payload);
                          
                           connection.start();
                          
                           // watchdog = new WatchdogTimer(producers, consumers, 90000, connection);
                          
                           //System.out.println("Sender starting");
                           try {
                           // For threads to create the sessions.
                           Thread.sleep(5000);
                           for(int i=0; i<10000; i++) {
                           ObjectMessage message = session.createObjectMessage();
                           message.setStringProperty("target", "text0");
                           message.setObject(payload+i);
                           producers[0].send(message);
                           // System.out.println("Sent iter : " + i);
                           // Thread.sleep(1000);
                           if (i%100 == 0) {
                           System.out.println("Sent " + i + "messages");
                           }
                           }
                           } catch (Throwable t ) {
                           System.out.println("Producer1 send got Error: "+t.getMessage());
                           }
                          
                           System.out.println("Done with sending");
                           Thread.sleep(200000);
                           consumerThread.join();
                           relayThread.join();
                          
                           } catch (Throwable t) {
                           System.err.println("Error: "+t.getMessage());
                           // t.printStackTrace(System.err);
                           } finally {
                           try {
                           if (connection != null){
                           connection.close();
                           connection = null;
                           }
                           } catch (JMSException e) {
                           e.printStackTrace();
                           }
                           }
                          
                          // if (watchdog.isAlive()) {
                          // watchdog.interrupt();
                          // }
                          // watchdog.join();
                          
                           date = new Date();
                           if (!watchdog.interrupted) {
                           System.out.println(date.toString()+": "+testName + " : PASSED");
                           } else {
                           System.out.println(date.toString()+": "+testName + " : FAILED");
                           }
                           }
                          
                           class MyRelayerWithNewSession extends Thread {
                           Connection connection = null;
                           Destination topic = null;
                           String localTarget = null;
                           String relayTarget = null;
                           int delay_ms = 0;
                           boolean explicit_ack = false;
                           String payload = null;
                          
                           Session consumeSession = null;
                           Session produceSession = null;
                          
                           MessageConsumer consumer = null;
                           MessageProducer producer = null;
                          
                           MyRelayerWithNewSession(Connection connection, Destination topic, String localTarget,
                           String relayTarget, int delay_ms, boolean explicit_ack, String payload) {
                           this.connection = connection;
                           this.topic = topic;
                           this.localTarget = localTarget;
                           this.relayTarget = relayTarget;
                           this.delay_ms = delay_ms;
                           this.explicit_ack = explicit_ack;
                           this.payload = payload;
                           start();
                           }
                          
                           public void run(){
                          
                           try {
                           consumeSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                           produceSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                          
                           consumer = consumeSession.createConsumer(topic, "target='"+localTarget+"'", false);
                           producer = produceSession.createProducer(topic);
                          
                           Thread.sleep(delay_ms);
                           System.out.println("Relayer thread waking up");
                           for (int i=0; i< 10000; i ++) {
                           Message receivedMessage = consumer.receive();
                           if (explicit_ack) {
                           receivedMessage.acknowledge();
                           }
                           String receivedPayload = (String) ((ObjectMessage) receivedMessage).getObject();
                           // System.out.println("Relay Read at iter "+i+" : " + receivedPayload);
                           // System.out.println("Relay iter "+i+" : "+receivedPayload.substring(19));
                           // assertEquals(true, receivedPayload.equals(payload+i));
                          
                           ObjectMessage message = produceSession.createObjectMessage();
                           message.setStringProperty("target", relayTarget);
                           message.setObject(receivedPayload);
                           producer.send(message);
                          
                           if (i%100 == 0) {
                           System.out.println("Relayed " + i + "messages");
                           }
                           }
                          
                           System.out.println("Done with Relayer thread");
                          
                           } catch (Throwable t) {
                           System.err.println("Relayer Thread encountered Error: "+t.getMessage());
                           // t.printStackTrace(System.err);
                           }
                           }
                          
                           }
                          
                           class MyConsumerWithNewSession extends Thread {
                           Connection connection = null;
                           Destination topic = null;
                           String localTarget = null;
                           int delay_ms = 0;
                           boolean explicit_ack = false;
                           String payload = null;
                          
                           Session consumeSession = null;
                           MessageConsumer consumer = null;
                          
                           MyConsumerWithNewSession(Connection connection, Destination topic, String localTarget,
                           int delay_ms, boolean explicit_ack, String payload) {
                           this.connection = connection;
                           this.topic = topic;
                           this.localTarget = localTarget;
                           this.delay_ms = delay_ms;
                           this.explicit_ack = explicit_ack;
                           this.payload = payload;
                           start();
                           }
                          
                           public void run(){
                           try {
                           consumeSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                           consumer = consumeSession.createConsumer(topic, "target='"+localTarget+"'", false);
                          
                           Thread.sleep(delay_ms);
                           System.out.println("Receiver waking up");
                           for (int i=0; i< 10000; i ++) {
                           Message receivedMessage = consumer.receive();
                           if (explicit_ack) {
                           receivedMessage.acknowledge();
                           }
                           String receivedPayload = (String) ((ObjectMessage) receivedMessage).getObject();
                           // System.out.println("Receive Read at iter "+i+" : " + receivedPayload);
                           // System.out.println("Receive iter "+i+" : "+receivedPayload.substring(19));
                           // assertEquals(true, receivedPayload.equals(payload+i));
                          
                           if (i%100 == 0) {
                           System.out.println("Received " + i + "messages");
                           }
                           }
                          
                           System.out.println("Done with Consumer thread");
                          
                           } catch (Throwable t) {
                           System.err.println("Receiver Thread encountered Error: "+t.getMessage());
                           // t.printStackTrace(System.err);
                           }
                           }
                           }
                          
                           static class WatchdogTimer extends Thread {
                          
                           long waitTime = 0L;
                           boolean interrupted = false;
                           Connection connection = null;
                           MessageProducer[] producers = null;
                           MessageConsumer [] consumers = null;
                          
                           public WatchdogTimer (MessageProducer[] producers, MessageConsumer [] consumers,
                           long waitTime, Connection connection) {
                           this.producers = producers;
                           this.consumers = consumers;
                           this.waitTime = waitTime;
                           this.connection = connection;
                           start();
                           }
                          
                           public void run () {
                          
                           try {
                           Thread.sleep(this.waitTime);
                           this.interrupted = true;
                          
                           System.out.println("Watchdog waking up: closing producers and consumers");
                           for (int i = 0; i < this.consumers.length; i++) {
                           this.consumers.close();
                           }
                           for (int i = 0; i < this.producers.length; i++) {
                           this.producers.close();
                           }
                          
                           connection.close();
                           } catch (InterruptedException thrExc) {
                           System.out.println("watchdog interrupted");
                           } catch (JMSException jmsExc) {
                           System.out.println("watchdog got jmsExc");
                           }
                          
                           }
                          
                           }
                          
                          }