9 Replies Latest reply on Aug 12, 2009 5:59 AM by Alexander Lengson

    JBM Topic Pub-Sub

    Brandon Daye Newbie

      I would like to get some input from the JBM community relating to publish/subscribe models using JBoss Messaging.

      I have a web service that takes a request and and subscribes that entity to a particular topic(s). I am having trouble maintaining state for the various subscriptions to particular topics. It seems that the javax.jms.Session that I use to create the subscribers doesn't maintain the information. Maybe the answer lies in my misunderstanding of how JBM works but I am going to walk you through what I'm doing to accomplish my pub-sub model.

      1. ConnectionFactory looked up through jndi -> /ConnectionFactory
      2. A Connection is created from the factory.
      3. A Session is created from the Connection

      From here, I can create the subscribers and verify that they exists by publishing to them immediately. I can dynamically create new topics by using the ServerPeer and MBeans. I can't figure out how to keep all subscribers active. I have tried a simple unit test, using only 1 ConnectionFactory, Connection and Session and still, no dice. Is there a configuration file that enables persistence?

        • 1. Re: JBM Topic Pub-Sub
          Andy Taylor Master

          I'm not 100% sure what you're problem actually is but it sounds to me like you need to use durable subscribers!

          • 2. Re: JBM Topic Pub-Sub
            Brandon Daye Newbie

            I would like to as a follow on question relating to durable subscriptions. If a durable subscription is created:

            MessageConsumer consumer = session.createDurableSubscriber(Topic topic, String name);


            The whole point of using durable subscribers is so that messages can be received while the consumer is not connected. Let us say the consumer above is created in a method and the reference ceases to exist after. The consumer can still be notified of any messages published to its topic. However, how can that consumer be closed and unsubscribed? If you attempt to unsubscribe the durable subscriber:

            session.unsubscribe(String name);


            an IllegalStateException will be thrown per the JMS 1.1 spec:

            "It is erroneous for a client to delete a durable subscription while there is an active MessageConsumer or TopicSubscriber for the subscription, or while a consumed message is part of a pending transaction or has not been acknowledged in the session. "

            How then can durable subscriptions ever be deleted? I understand if you create a new one with the same name and a new topic, it will effectively delete the old one. Is this the only option?

            • 3. Re: JBM Topic Pub-Sub
              Tim Fox Master

              No, that's not how JMS durable subscriptions work. There are quite a few good JMS tutorials out there which should go through this.

              In short:


              MessageConsumer consumer = session.createDurableSubscriber(myTopic, "foo"); //Creates the durable subscription and returns a consumer on it
              
              consumer.close() //Closes the consumer but the subscription is still active and receiving messages - you can close down the whole client at this point.. then.. some time later:
              
              consumer = session.createDurableSubscriber(myTopic, "foo"); //Does not create a new durable sub since it already exists - but just returns a consuimer on it.
              
              consumer.close() // closes the consumer - the subscription is still alive
              
              session.unsubscribe("foo") // that's how you delete it - notice you've already closed the consumer/
              


              HTH







              • 4. Re: JBM Topic Pub-Sub
                Brandon Daye Newbie

                Please forgive me for posting what may be trivial questions but I have not found the answers to any questions that I have on the web.

                Using the above approach, the consumer that takes active control of the subscription would have to poll to retrieve the messages published while it was offline ( consumer.receive() )? When disconnected, the MessageListener's asynchronous nature is eliminated.

                I am trying to accomplish something pretty simply but I have yet to find a simple solution. I would like to create message consumers for various topics. The message consumers will be created upon an HTTP request (the socket will be closed and will not wait). So, a particular request could generate a couple of message consumers. What is the correct way to manage them, durable non-durable? If non-durable, store the message consumers in memory?

                As you have noticed, I am a JMS newbie and am not aware of existing patterns. If you could provide me with external links or examples, that would be very helpful.

                • 5. Re: JBM Topic Pub-Sub
                  Yong Hao Gao Master


                  MessageConsumer.setMessageListener() will always there for you to receive async messages.

                  Durable means JBM server will keep the messages for a durable subscription until the messages are received or expired. Please see the JMS spec for details.

                  • 6. Re: JBM Topic Pub-Sub
                    Andy Taylor Master

                    I think the problem you have is you're creating state from a http request. The http request/response paradigm is stateless and shouldn't be spawning any threads etc. you can add objects to a http session and access them from within a servlet but these are tied to the session. Maybe you need a re think for your application.

                    • 7. Re: JBM Topic Pub-Sub
                      Poonam Agarwal Newbie

                      Hi,

                      I am trying to accomplish something similar to bdaye4.
                      I have multiple clients to whom I want to publish updates from my server.
                      So, whenever my server has an update it publishes to a Topic XYZ.
                      What I want to achive is :
                      Whenever a client is started, I want it to subscribe to the Topic XYZ.
                      Whenever a client is stopped, I want it to unsubscibe from the topic XYZ.

                      Option I : Using Durable Subscribers:

                      When a client is created:
                      I use TopicSession to create Durable subscribers so that I can specify the subscriptionID with which I want to create the subscriber.

                      When a client is stopped:
                      I create a TopicSession on the ConnnectionFactory.
                      I will need to call TopicSession.unsubscribe(subscriptionID), but this gives me IllegalStateException saying that the subscriber is still active.
                      Can somebody let me know, if I am missing something(It's a Friday).

                      Option II : Non-Durable
                      Alternatively, if I use non-durable subscription,
                      how do create non-durable subscribers with a subscriptionID, so that I can remove those subscribers by specifying a subscriptionID.

                      Any help is appreciated.

                      • 8. Re: JBM Topic Pub-Sub
                        Alexander Lengson Newbie

                         

                        "bdaye42" wrote:

                        I can dynamically create new topics by using the ServerPeer and MBeans.

                        Could you please share the code which injects or lookups ServerPeer please?

                        I can't inject it
                        @EJB(beanName = "jboss.messaging:service=ServerPeer")
                         ServerPeer serverPeer;

                        due to "Caused by: javax.management.MalformedObjectNameException: Invalid character ':' in value part of property"

                        Neither lookup it
                         MBeanServer server = MBeanServerLocator.locate();
                         serverPeer = (ServerPeer) MBeanProxyExt.create(
                         ServerPeer.class,
                         "jboss.messaging:service=ServerPeer",server);

                        because of "org.jboss.jms.server.ServerPeer is not an interface"

                        How to obtain reference which would allow to make:
                         serverPeer.deployQueue("q01", "q01");
                        


                        Thanks.

                        • 9. Re: JBM Topic Pub-Sub
                          Alexander Lengson Newbie

                           

                          "sanches" wrote:

                          Could you please share the code which injects or lookups ServerPeer please?

                          Solution is described at
                          http://www.jboss.org/index.html?module=bb&op=viewtopic&t=159697