3 Replies Latest reply on Nov 25, 2009 10:35 AM by strzyga

    message publishing to stopped topic

      i'm using JBM 1.4.0.SP3/JBoss 4.3.0 GA

      i have a topic with one durable subscriber and a publisher which periodically publish messages to topic
      when i stop the topic (jmx-console stop() method) publisher still publishes messages, but those messages are not persisted in DB (and not received by consumer)

      the problem is that method TopicPublisher.publish(Message), while publishing to stopped topic, returns without any exceptions, so i have no idea that something is wrong and that i'm loosing messages
      is it ok?

      JBM 1.4 User's Guide chapter 7.7 says:
      "If the call to send a persistent message to a persistent destination returns successfully with no exception, then you can be sure that the message was persisted..."

        • 1. Re: message publishing to stopped topic
          gaohoward

          Do you have any test to reproduce this?
          Thanks

          • 2. Re: message publishing to stopped topic

            Thanks for the reply,
            Here's the code for two MBeans - producer and consumer:

            Producer:

            package topictest;
            
            import java.util.concurrent.atomic.AtomicLong;
            
            import javax.annotation.Resource;
            import javax.jms.JMSException;
            import javax.jms.Session;
            import javax.jms.TextMessage;
            import javax.jms.Topic;
            import javax.jms.TopicConnection;
            import javax.jms.TopicConnectionFactory;
            import javax.jms.TopicPublisher;
            import javax.jms.TopicSession;
            
            import org.apache.log4j.Logger;
            import org.jboss.annotation.ejb.Service;
            
            @Service(objectName = "test:service=Producer")
            public class TestProducer implements TestProducerMBean {
            
             @Resource(mappedName = "java:/JmsXA")
             private TopicConnectionFactory tConnFactory;
            
             private TopicSession tSession;
            
             private TopicConnection connection;
            
             @Resource(mappedName = "/topic/test-Topic")
             private Topic topic;
            
             private TopicPublisher publisher;
            
             private AtomicLong counter = new AtomicLong();
            
             private static final Logger logger = Logger.getLogger(TestProducer.class);
            
             @Override
             public void send(Integer msgNum) throws JMSException {
            
             for (int i = 0; (i < msgNum || -1 == msgNum); i++) {
             String messageText = String.valueOf(counter.getAndIncrement());
             TextMessage msg = tSession.createTextMessage(messageText);
             publisher.publish(msg);
             logger.info("Published " + messageText);
            
             try {
             Thread.sleep(500);
             } catch (InterruptedException ignored) {
             }
             }
             }
            
             @Override
             public void start() throws JMSException {
             connection = tConnFactory.createTopicConnection();
             tSession = connection.createTopicSession(false,
             Session.AUTO_ACKNOWLEDGE);
             publisher = tSession.createPublisher(topic);
             }
            
             @Override
             public void stop() throws JMSException {
             if (publisher != null) {
             publisher.close();
             tSession.close();
             connection.close();
             }
             }
            
            }
            


            Management interface for Producer:
            package topictest;
            
            import javax.jms.JMSException;
            
            import org.jboss.annotation.ejb.Management;
            
            @Management
            public interface TestProducerMBean {
             void start() throws JMSException;
             void stop() throws JMSException;
             void send(Integer msgNum) throws JMSException;
            }
            


            Consumer:
            package topictest;
            
            import javax.annotation.Resource;
            import javax.jms.JMSException;
            import javax.jms.Message;
            import javax.jms.MessageListener;
            import javax.jms.Session;
            import javax.jms.TextMessage;
            import javax.jms.Topic;
            import javax.jms.TopicConnection;
            import javax.jms.TopicConnectionFactory;
            import javax.jms.TopicSession;
            import javax.jms.TopicSubscriber;
            
            import org.apache.log4j.Logger;
            import org.jboss.annotation.ejb.Service;
            
            @Service(objectName = "test:service=Consumer")
            public class TestConsumer implements TestConsumerMBean, MessageListener{
            
             @Resource(mappedName = "java:/JmsXA")
             private TopicConnectionFactory tConnFactory;
            
             @Resource(mappedName = "/topic/test-Topic")
             private Topic topic;
            
             private TopicSession session;
             private TopicConnection connection;
             private TopicSubscriber subscriber;
            
             private static final Logger logger = Logger.getLogger(TestConsumer.class);
            
             @Override
             public void create() throws JMSException {
             connection = tConnFactory.createTopicConnection();
             connection.setClientID("test-client");
             session = connection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
             subscriber = session.createDurableSubscriber(topic, "test-subscriber");
             connection.start();
             }
            
             @Override
             public void destroy() throws JMSException {
             session.close();
             }
            
             @Override
             public void start() throws JMSException {
             startReceiving();
             }
            
             @Override
             public void startReceiving() throws JMSException {
             subscriber.setMessageListener(this);
             }
            
             @Override
             public void stopReceiving() throws JMSException {
             subscriber.setMessageListener(null);
             }
            
             @Override
             public void onMessage(Message arg0) {
             TextMessage message = (TextMessage) arg0;
             try {
             logger.info("Received " + message.getText());
             } catch (JMSException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
             }
             }
            }
            


            Management interface for Consumer:
            package topictest;
            
            import javax.jms.JMSException;
            
            import org.jboss.annotation.ejb.Management;
            
            @Management
            public interface TestConsumerMBean {
            
             void create() throws JMSException;
             void destroy() throws JMSException;
             void startReceiving() throws JMSException;
             void stopReceiving() throws JMSException;
             void start() throws JMSException;
            }
            


            Topic deployment descriptor that i'm using:
            <?xml version="1.0" encoding="UTF-8"?>
            <server>
             <mbean code="org.jboss.jms.server.destination.TopicService"
             name="test:service=Topic,name=test-Topic"
             xmbean-dd="xmdesc/Topic-xmbean.xml">
             <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
             <depends>jboss.messaging:service=PostOffice</depends>
             </mbean>
            </server>
            


            And my JmsXA config:
            <tx-connection-factory>
             <jndi-name>JmsXA</jndi-name>
             <xa-transaction/>
             <track-connection-by-tx/>
             <rar-name>jms-ra.rar</rar-name>
             <connection-definition>org.jboss.resource.adapter.jms.JmsConnectionFactory</connection-definition>
             <config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property>
             <config-property name="JmsProviderAdapterJNDI" type="java.lang.String">java:/DefaultJMSProvider</config-property>
            <config-property name="Strict" type="java.lang.Boolean">false</config-property>
             <max-pool-size>20</max-pool-size>
             <security-domain-and-application>JmsXARealm</security-domain-and-application>
             </tx-connection-factory>
            


            How to reproduce:
            1) find Producer in jmx-console and invoke it's send() operation, with argument specifying number of messages to publish; in jboss log you will see something like this:
            2009-11-20 18:57:03,498 INFO [topictest.TestProducer] Published 0
            2009-11-20 18:57:03,499 INFO [topictest.TestConsumer] Received 0
            2009-11-20 18:57:04,002 INFO [topictest.TestProducer] Published 1
            2009-11-20 18:57:04,003 INFO [topictest.TestConsumer] Received 1
            2009-11-20 18:57:04,504 INFO [topictest.TestProducer] Published 2
            2009-11-20 18:57:04,505 INFO [topictest.TestConsumer] Received 2
            

            2) while Producer is publishing - go to test-Topic and invoke it's stop() operation; you will see something like this:
            2009-11-20 18:57:04,002 INFO [topictest.TestProducer] Published 1
            2009-11-20 18:57:04,003 INFO [topictest.TestConsumer] Received 1
            2009-11-20 18:57:04,504 INFO [topictest.TestProducer] Published 2
            2009-11-20 18:57:04,505 INFO [topictest.TestConsumer] Received 2
            2009-11-20 18:57:04,821 INFO [org.jboss.jms.server.destination.TopicService] Topic[/topic/test-Topic] stopped
            2009-11-20 18:57:05,005 INFO [topictest.TestProducer] Published 3
            2009-11-20 18:57:05,508 INFO [topictest.TestProducer] Published 4
            2009-11-20 18:57:06,008 INFO [topictest.TestProducer] Published 5
            

            and there are no exceptions after "Topic stopped" log.
            You can also stop consumer by invoking it's stopReceiving() operation and watch that, after stopping topic, messages are not persisted to DB.

            Does producer has to check in some way that topic is stopped and stop publishing?
            Thanks

            • 3. Re: message publishing to stopped topic

              OK, maybe it'll help if I provide more context for what I'm doing.

              In my application remote clients call stateless ejb to send data to specified topic. An ejb calls singleton object which holds a map with entries in the form "topic name"->"TopicPublisher object". If there is no map entry for specified topic, a new connection, session and publisher are created and a new entry is added (in this way we have only one publisher to each topic). If topic specified by remote client does not exist or there is any other kind of error, ejb sends a false value back to the client.

              Problem happens when jboss is shutting down. If jboss stops topic before stopping ejb, there are a few milliseconds when ejb sends messages to stopped topic. In this case ejb should send false value back to the client to tell that there was an error and message wasn't persisted. But since there are no exceptions thrown by publisher.publish() method while publishing to stopped topic, it can't do this.

              In the above test topic is stopped by invoking manually it's jmx-console stop() method, and this test shows that indeed publisher.publish() returns succesfully when publishing to stopped topic.
              If it's normal for publisher, then what can I do with this situation? Should I create new session for each message or recover it?

              Many thanks in advance for any assistance with this issue.