6 Replies Latest reply on Apr 1, 2004 4:50 PM by jcab

    JBoss 3.2.2, JMS Message Listener Problem

    Wojciech Koszut Newbie

      Hi,

      My system uses publish-subscribe model with durable subscribers. There are 2 topics. One of them has 20 and second 40 subscribers. Subscribers are objects implementing MessageListener Interface, and everyone has own selector. After 12-24 hours of continuous work, message stays in topic for one, random, subscriber. If new message is published for this subscriber, it is not delivered. After restarting the subscriber, new messages are delivered to him, but the first which has stayed in topic is not. In JMX-Console DurableMessageCount is 0 - JMS can't see this messag e. After restarting JBoss everything is ok. My question is: why the messages are not delivered to subscribers (method onMessage() is not invoked) and JBoss can't see these messages? I suspect that first message that is not delivered to subscriber because of an error in JMS engine. Nothing is loggged in server.log.

      Invocation layer: the same problem in OIL, UIL2 and JVM
      Persistence Manager: the same problem in File PM and JDBC2 PM
      Average system load: 1-20 messages per second
      Average message size: 4 KB

      Best regards:
      Wojciech Koszut

        • 1. Re: JBoss 3.2.2, JMS Message Listener Problem
          Adrian Brock Master

          There is an occasion when it will stop delivering messages to a message listener,
          this is usually caused by an unhandled error trying to receive messages.

          This normally happens when JBoss doesn't understand what the error means,
          e.g. some corruption caused by the network transport or a request the server
          did not understand and threw back to the client.

          Unless you have log4j enabled in the client you wouldn't see the warning:

          catch (JMSException e)
          {
          log.warn("Message consumer closing due to error in listening thread.", e);
          try
          {
          close();
          }
          catch (Exception ignore)
          {
          }
          }

          It might be that an error occurred while the session was delivering the message
          to the consumer and the message got stuck in the session (i.e. it wasn't nacked as
          undeliverable). It should be nacked (negatively acknowledged) by the server
          if the client closes the connection or disconnects.

          I would need to see the warning logged above to find out what error JBossMQ
          failed to handle.

          Restarting JBoss (or redeploying the topic) will cause the subscriptions to be recovered
          from the persistent store. The "lost" message will be available again.

          Regards,
          Adrian

          • 2. Re: JBoss 3.2.2, JMS Message Listener Problem
            Wojciech Koszut Newbie

            Hi,
            thanks for the reply. However, our problem persists. For each MessageListener object, a single TopicSession instance is created. While receiving and processing messages, Listener performs proper operations on the session object (commits session with commit() or rolls back with rollback()). In case of any critical exception, it is logged and TopicSubscriber is closed. In spite of catching all exceptions- nothing is written to the logs. Below are the skeleton of clients:

            MessageListeners
            ------------------
            public class UMPMessageListener implements MessageListener {
            private transient TopicSubscriber topicSubscriber;
            private transient TopicSession topicSession;

            public void onMessage(Message message) {
            try {
            String messageID = message.getJMSMessageID();
            logger.info("[TRANSFER MESSAGE " + messageID + "]");
            // ...............
            // process message
            // e.g. RMIInvocation
            // ...............
            logger.info("[TRANSFERED " + messageID + "]");

            // confirm delivery
            topicSession.commit();

            } catch (UMPMessageRedelivery umpMessageRedelivery) {
            logger.info("Message must redelivery.");
            messageRollback(message);
            } catch (ConnectException e) {
            logger.error("ConnectionException !", e);
            messageRollback(message);
            unsubscribe();
            } catch (RemoteException e) {
            logger.error("RemoteException !", e);
            messageRollback(message);
            unsubscribe();
            } catch (JMSException e) {
            logger.error("Unknown state ?!?!?!", e);
            messageRollback(message);
            } catch (Exception e) {
            logger.error("Unknown state ?!?!?!", e);
            messageRollback(message);
            }


            }


            private void messageRollback(Message message) {
            try {
            topicSession.rollback();
            logger.debug("Rollback transaction for Message: " + message.getJMSMessageID());
            } catch (JMSException e) {
            logger.error("Error !", e);
            }
            }

            private void unsubscribe() {
            try {
            topicSession.close();
            topicSubscriber.close();
            } catch (JMSException e) {
            logger.error("Error !", e);
            }
            }
            }




            Create Subscriber and Register Listeners(in System as Singleton):
            ------------------------------------------------------------------
            public class RegisterImpl {
            private transient TopicConnectionFactory topicConnectionFactory;
            private transient TopicConnection topicConnection;

            public RegisterImpl() {
            try {
            topicConnectionFactory = (TopicConnectionFactory) EJBHomeFactory.getFactoryClient().getInitialContext().lookup("ConnectionFactory");
            topicConnection = topicConnectionFactory.createTopicConnection();
            topicConnection.setClientID("ProviderRegister");
            } catch (Exception e) {
            logger.fatal("FatalError !", e);
            throw new IllegalStateException();
            }
            }


            public void register(...) {
            String providerId = ... // unique for UMPMessageListener

            TopicSession topicSession = null;
            try {
            topicSession = topicConnection.createTopicSession(true, 0);
            } catch (JMSException e) {
            logger.error("Error !", e);
            }

            UMPMessageListener messageListener = new UMPMessageListener();

            Topic topic = null;

            try {
            topicConnection.stop();

            try {
            topic = (Topic) EJBHomeFactory.getFactory().getInitialContext().lookup(/* lookup for Topic */);
            } catch (Exception e) {
            logger.fatal("FatalError while geting the Topic !", e);
            throw new IllegalStateException();
            }

            TopicSubscriber topicSubscriber = topicSession.createDurableSubscriber(
            topic,
            topic.getTopicName() + "." + providerId,
            selector,
            false
            );

            messageListener.setTopicSession(topicSession);
            messageListener.setTopicSubscriber(topicSubscriber);
            topicSubscriber.setMessageListener(messageListener);

            topicConnection.start();
            } catch (JMSException e) {
            logger.error("Error !", e);
            }
            }
            }



            Message Publisher:
            -----------------
            public void publishMessage(Object aMessage) {
            TopicSession topicSession = null;
            TopicPublisher topicPublisher = null;
            ObjectMessage message = null;
            try {
            topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            topicPublisher = topicSession.createPublisher(topic);

            message = topicSession.createObjectMessage();
            // ......
            // ...... additional properties for message
            // ......
            message.setObject(aMessage);

            topicPublisher.publish(topic, message, DeliveryMode.PERSISTENT, 4, 0);
            } catch (JMSException e) {
            logger.fatal("FatalError !", e);
            throw new IllegalStateException();
            } finally {
            if(topicSession != null) {
            try {
            topicPublisher.close();
            topicSession.close();
            } catch (JMSException e) {
            logger.error("Error !", e);
            }
            }
            }
            logger.debug("Message Published: " + aMessage.toString());
            }


            Best regards:
            Wojciech Koszut

            • 3. Re: JBoss 3.2.2, JMS Message Listener Problem
              drgn Newbie

              Hi.

              I have similar problem with delivering messages to Client (message listener). I'm using publish/subscribe model with transactions too.

              Adrian, please help resolve this problem.

              Regards,
              Tomek

              • 4. Re: JBoss 3.2.2, JMS Message Listener Problem
                jcab Newbie

                I am also having the problem of a client no longer receiving messages.

                Running on 3.2.1 (I've also tried version 3.2.3 with the same results). After running for about an hour with 15 messages per second/ 4 consumers, one consumer no longer recieve messages. Below is the exception I get from the JBoss output at this point. I have set the setExceptionListener() on the client connection and it receives no errors. Is there a way the client can periodically check to see that it's connection is still valid?

                15:05:45,312 WARN [OILServerILService] Client request resulted in a server exception:
                javax.jms.JMSException: SELECTOR: java.lang.Long
                at org.jboss.mq.selectors.Selector.test(Selector.java:186)
                at org.jboss.mq.Subscription.accepts(Subscription.java:81)
                at org.jboss.mq.server.BasicQueue.receive(BasicQueue.java:251)
                at org.jboss.mq.server.JMSTopic.receive(JMSTopic.java:260)
                at org.jboss.mq.server.ClientConsumer.receive(ClientConsumer.java:225)
                at org.jboss.mq.server.JMSDestinationManager.receive(JMSDestinationManager.java:672)
                at org.jboss.mq.server.JMSServerInterceptorSupport.receive(JMSServerInterceptorSupport.java:225)
                at org.jboss.mq.security.ServerSecurityInterceptor.receive(ServerSecurityInterceptor.java:103)
                at org.jboss.mq.server.TracingInterceptor.receive(TracingInterceptor.java:478)
                at org.jboss.mq.server.JMSServerInvoker.receive(JMSServerInvoker.java:227)
                at org.jboss.mq.il.oil.OILServerILService$Client.run(OILServerILService.java:294)
                at java.lang.Thread.run(Thread.java:536)
                15:05:45,359 WARN [SpyMessageConsumer] Message consumer closing due to error in listening thread.
                javax.jms.JMSException: SELECTOR: java.lang.Long
                at org.jboss.mq.selectors.Selector.test(Selector.java:186)
                at org.jboss.mq.Subscription.accepts(Subscription.java:81)
                at org.jboss.mq.server.BasicQueue.receive(BasicQueue.java:251)
                at org.jboss.mq.server.JMSTopic.receive(JMSTopic.java:260)
                at org.jboss.mq.server.ClientConsumer.receive(ClientConsumer.java:225)
                at org.jboss.mq.server.JMSDestinationManager.receive(JMSDestinationManager.java:672)
                at org.jboss.mq.server.JMSServerInterceptorSupport.receive(JMSServerInterceptorSupport.java:225)
                at org.jboss.mq.security.ServerSecurityInterceptor.receive(ServerSecurityInterceptor.java:103)
                at org.jboss.mq.server.TracingInterceptor.receive(TracingInterceptor.java:478)
                at org.jboss.mq.server.JMSServerInvoker.receive(JMSServerInvoker.java:227)
                at org.jboss.mq.il.oil.OILServerILService$Client.run(OILServerILService.java:294)
                at java.lang.Thread.run(Thread.java:536)

                regards,
                Jenny

                • 5. Re: JBoss 3.2.2, JMS Message Listener Problem
                  Adrian Brock Master

                  So the selector is expecting a Long and the message contains something else,
                  or vice-versa?

                  Regards,
                  Adrian

                  • 6. Re: JBoss 3.2.2, JMS Message Listener Problem
                    jcab Newbie

                    That's what is puzzling. There is no producer or consumer using a long value. I printed out some verbose output to see what I am sending and recieving. All clients have been receiving messages consistently (and correctly) for up to an hour at 12 messages/sec before one client gets this error (seen only in JBoss).

                    I put JBoss in verbose mode and got some more output on the error.

                    Below is an example of a sent message and a received message and then the verbose output from JBoss.

                    I send the message with an Integer value to match with a Src. The JMS Selector string is something like: (Src=171 OR Src=31 OR Src=95)

                    [*** Message being sent to JMS]
                    FINE 04/01/2004 05:25:59:31 Publishing big msg: org.jboss.mq.SpyObjectMessage {
                    Header {
                    jmsDestination : TOPIC.server.fault.alarm.ALARMTOPIC
                    jmsDeliveryMode : 2
                    jmsExpiration : 1080876358968
                    jmsPriority : 4
                    jmsMessageID : ID:75-1080858358968785
                    jmsTimeStamp : 1080858358968
                    jmsCorrelationID: null
                    jmsReplyTo : null
                    jmsType : null
                    jmsRedelivered : false
                    jmsPropertiesReadWrite:true
                    msgReadOnly : false
                    producerClientId: ID:75
                    }
                    }

                    [*** Message being received]
                    FINEST 04/01/2004 05:25:58:984 Getting onMessage - org.jboss.mq.SpyObjectMessage {
                    Header {
                    jmsDestination : TOPIC.server.fault.alarm.ALARMTOPIC
                    jmsDeliveryMode : 2
                    jmsExpiration : 1080876358968
                    jmsPriority : 4
                    jmsMessageID : ID:75-1080858358968785
                    jmsTimeStamp : 1080858358968
                    jmsCorrelationID: null
                    jmsReplyTo : null
                    jmsType : null
                    jmsRedelivered : false
                    jmsPropertiesReadWrite:false
                    msgReadOnly : true
                    producerClientId: ID:75
                    }
                    }


                    [*** Verbose output from JBoss exception]:
                    17:25:59,343 DEBUG [Selector] Invalid selector:
                    java.lang.ClassCastException: java.lang.Long
                    at org.jboss.mq.selectors.Operator.computeArgument1(Operator.java:768)
                    at org.jboss.mq.selectors.Operator.equal(Operator.java:160)
                    at org.jboss.mq.selectors.Operator.apply(Operator.java:879)
                    at org.jboss.mq.selectors.Operator.computeArgument2(Operator.java:792)
                    at org.jboss.mq.selectors.Operator.or(Operator.java:266)
                    at org.jboss.mq.selectors.Operator.apply(Operator.java:885)
                    at org.jboss.mq.selectors.Operator.computeArgument1(Operator.java:748)
                    at org.jboss.mq.selectors.Operator.or(Operator.java:254)
                    [snip - same set of line numbers over and over again]
                    at org.jboss.mq.selectors.Operator.apply(Operator.java:885)
                    at org.jboss.mq.selectors.Operator.computeArgument1(Operator.java:748)
                    at org.jboss.mq.selectors.Operator.or(Operator.java:254)
                    at org.jboss.mq.selectors.Operator.apply(Operator.java:885)
                    at org.jboss.mq.selectors.Operator.computeArgument1(Operator.java:748)
                    at org.jboss.mq.selectors.Operator.or(Operator.java:254)
                    at org.jboss.mq.selectors.Operator.apply(Operator.java:885)
                    at org.jboss.mq.selectors.Operator.computeArgument1(Operator.java:748)
                    at org.jboss.mq.selectors.Operator.or(Operator.java:254)
                    at org.jboss.mq.selectors.Operator.apply(Operator.java:885)
                    at org.jboss.mq.selectors.Selector.test(Selector.java:153)
                    at org.jboss.mq.Subscription.accepts(Subscription.java:81)
                    at org.jboss.mq.server.BasicQueue.receive(BasicQueue.java:251)
                    at org.jboss.mq.server.JMSTopic.receive(JMSTopic.java:260)
                    at org.jboss.mq.server.ClientConsumer.receive(ClientConsumer.java:225)
                    at org.jboss.mq.server.JMSDestinationManager.receive(JMSDestinationManager.java:672)
                    at org.jboss.mq.server.JMSServerInterceptorSupport.receive(JMSServerInterceptorSupport.java:225)
                    at org.jboss.mq.security.ServerSecurityInterceptor.receive(ServerSecurityInterceptor.java:103)
                    at org.jboss.mq.server.TracingInterceptor.receive(TracingInterceptor.java:478)
                    at org.jboss.mq.server.JMSServerInvoker.receive(JMSServerInvoker.java:227)
                    at org.jboss.mq.il.oil.OILServerILService$Client.run(OILServerILService.java:294)
                    at java.lang.Thread.run(Thread.java:536)

                    17:25:59,343 WARN [OILServerILService] Client request resulted in a server exception:
                    javax.jms.JMSException: SELECTOR: java.lang.Long
                    at org.jboss.mq.selectors.Selector.test(Selector.java:186)
                    at org.jboss.mq.Subscription.accepts(Subscription.java:81)
                    at org.jboss.mq.server.BasicQueue.receive(BasicQueue.java:251)
                    at org.jboss.mq.server.JMSTopic.receive(JMSTopic.java:260)
                    at org.jboss.mq.server.ClientConsumer.receive(ClientConsumer.java:225)
                    at org.jboss.mq.server.JMSDestinationManager.receive(JMSDestinationManager.java:672)
                    at org.jboss.mq.server.JMSServerInterceptorSupport.receive(JMSServerInterceptorSupport.java:225)
                    at org.jboss.mq.security.ServerSecurityInterceptor.receive(ServerSecurityInterceptor.java:103)
                    at org.jboss.mq.server.TracingInterceptor.receive(TracingInterceptor.java:478)
                    at org.jboss.mq.server.JMSServerInvoker.receive(JMSServerInvoker.java:227)
                    at org.jboss.mq.il.oil.OILServerILService$Client.run(OILServerILService.java:294)
                    at java.lang.Thread.run(Thread.java:536)

                    17:25:59,500 WARN [SpyMessageConsumer] Message consumer closing due to error in listening thread.
                    javax.jms.JMSException: SELECTOR: java.lang.Long
                    at org.jboss.mq.selectors.Selector.test(Selector.java:186)
                    at org.jboss.mq.Subscription.accepts(Subscription.java:81)
                    at org.jboss.mq.server.BasicQueue.receive(BasicQueue.java:251)
                    at org.jboss.mq.server.JMSTopic.receive(JMSTopic.java:260)
                    at org.jboss.mq.server.ClientConsumer.receive(ClientConsumer.java:225)
                    at org.jboss.mq.server.JMSDestinationManager.receive(JMSDestinationManager.java:672)
                    at org.jboss.mq.server.JMSServerInterceptorSupport.receive(JMSServerInterceptorSupport.java:225)
                    at org.jboss.mq.security.ServerSecurityInterceptor.receive(ServerSecurityInterceptor.java:103)
                    at org.jboss.mq.server.TracingInterceptor.receive(TracingInterceptor.java:478)
                    at org.jboss.mq.server.JMSServerInvoker.receive(JMSServerInvoker.java:227)
                    at org.jboss.mq.il.oil.OILServerILService$Client.run(OILServerILService.java:294)
                    at java.lang.Thread.run(Thread.java:536)

                    17:25:59,515 DEBUG [SpyMessageConsumer] Message consumer closing.
                    17:25:59,562 DEBUG [Connection] Connection: removeSession(dest=TOPIC.server.fault.alarm.ALARMTOPIC)

                    Regards,
                    Jenny