2 Replies Latest reply on Sep 15, 2011 3:49 PM by frawolff

    subscriber.setMessageListener(null) / subscriber.close() questions

    frawolff

      Hi,

       

      The JMS API documentation states: "The effect of calling MessageConsumer.setMessageListener while messages are being consumed by an existing listener or the consumer is being used to consume messages synchronously is undefined." (MessageConsumer class)

       

      I understand that synchronous and asynchronous delivery modes cannot be used at the same time.

       

      Here is a code snippet that works with JBM (from a standalone Java client application) and reproduce more or less some of the code logic I am currently writing (the semaphore here is used to mimic the asynchronous processing / long-polling timeout of a asynchronous servlet):

       

      {code}

      final TopicSubscriber sub = session.createDurableSubscriber(topic, subscriptionId);

      final List<Message> messages = Collections.synchronizedList(new ArrayList<Message>());

       

      final Semaphore done = new Semaphore(1);

      done.acquire();

       

      sub.setMessageListener(new MessageListener() {

       

          @Override

          public void onMessage(Message message) {

              try {

                  sub.setMessageListener(null);

                  messages.add(message);

                  done.release();

              }

              catch (JMSException e) {

                  e.printStackTrace();

              }

          }

      });

       

      // Wait for a onMessage() call with a 20 seconds timeout.

      boolean timeout = done.tryAcquire(20, TimeUnit.SECONDS);

       

      // Reset the message listener if it has never been called.

      if (timeout)

          sub.setMessageListener(null);

       

      // Try to get all pending messages (if any).

      Message message = null;

      while ((message = sub.receiveNoWait()) != null)

          messages.add(message);

       

      sub.close();

       

      // do something with the messages...

      {code}

       

      Questions:

       

      1. Calling setMessageListener(null) in the onMessage(...) message works: the onMessage method is executed in a JBM thread and reseting the message listener in the same thread doesn't seem to a problem, even if it the JMS API documentation speaks about "undefined" behaviour in this case. Is it valid to call setMessageListener(null) here (JBM / HornetQ / JMS in general)?

       

      2. The second call (after the timeout) is more problematic: setMessageListener(null) isn't called from a JBM thread and it can possibly be executed during a concurrent execution of the onMessage(...) method, from a JBM thread. Again, is it valid to call setMessageListener(null) here (JBM / HornetQ / JMS in general)?

       

      3. The call to the close() method at the end shouldn't be problem. The documentation states: "This call blocks until a receive or message listener in progress has completed". Because it is called after the listener has been reset to null and after receiveNoWait() calls (if any), it shouldn't block at all. Am I right?

       

      I could live without the following (commented out) part of the code:

       

      {code}

      done.tryAcquire(20, TimeUnit.SECONDS);

       

      // if (timeout)

      //     sub.setMessageListener(null);

      // Message message = null;

      // while ((message = sub.receiveNoWait()) != null)

      //     messages.add(message);

       

      sub.close();

      {code}

       

      The close() call would block if a onMessage() call is executed at the same time but it has an annoying drawback: I must close the subscriber and I cannot cache it for later reuses (this is not so bad but it can cause performance issues)...

       

      Thanks for any clarification.

        • 1. Re: subscriber.setMessageListener(null) / subscriber.close() questions
          ataylor

          1. Calling setMessageListener(null) in the onMessage(...) message works: the onMessage method is executed in a JBM thread and reseting the message listener in the same thread doesn't seem to a problem, even if it the JMS API documentation speaks about "undefined" behaviour in this case. Is it a valid to call setMessageListener(null) here (JBM / HornetQ / JMS in general)?

          Its valid, however like the spec says you cant rely on the behaviour.

          2. The second call (after the timeout) is more problematic: setMessageListener(null) isn't called from a JBM thread and it can possibly be executed during a concurrent execution of the onMessage(...) method, from a JBM thread. Again, is it valid to call setMessageListener(null) here (JBM / HornetQ / JMS in general)?

          thats a no no, JMS sessions (and there consumers etc) arent thread safe so should only ever be accessed from the same thread. I think the JMS spec actually mentions this.

          3. The call to the close() method at the end shouldn't be problem. The documentation states: "This call blocks until a receive or message listener in progress has completed". Because it is called after the listener has been reset to null and after receiveNoWait() calls (if any), it shouldn't block at all. Am I right?

          correct

           

          I'm a little curious as to what your trying to acheive here, maybe you need to re think your approach?

          1 of 1 people found this helpful
          • 2. Re: subscriber.setMessageListener(null) / subscriber.close() questions
            frawolff

            Hi Andy and many thanks for your quick answer,

             

            Here is some clarification about my project: I'm the founder of the GraniteDS framework, a Flex / JavaEE integration platform. GraniteDS implements a real-time messaging module that relies on asynchronous servlets (long-polling) and JMS topics (JMS is optional, we also have a basic and custom topic / subscription manager of our own). Subscriptions to JMS topics are non-durable in the current implementation with many issues related to client-side or network failures, fail-over events in clustered environment, all resulting in possible messages loss.

             

            I'm now trying to implement a more robust JMS integration with durable subscriptions and I need to figure out what are the possible options with this kind of dual asynchronous processing: I get an HTTP request asking for new messages. If they are no pending messages at the time of the request, the servlet request starts asynchronous processing and a message listener is set on the subscriber. Two events are then possible:

             

            1. A message is received through the onMessage() method (JMS thread): the JMS asynchronous delivery must be cancelled and the message sent back to the client, by using the async HTTP context.

             

            2. An HTTP timeout occur (servlet thread): the JMS asynchronous delivery must be cancelled again and an empty response sent to the client, asking him to reconnect.

             

            More tricky, these two events can occur at the time…

             

            My previous post was an attempt to figure out what would the different options for canceling the JMS asynchronous delivery processing after the first received message. As far as I understand your answer, setMessageListener(null) isn't reliable anywhere . The only other option is to close the subscription.

             

            I just tried this (weird) code:

             

            {code}

            final TopicSubscriber sub = session.createDurableSubscriber(topic, subscriptionId);

             

            sub.setMessageListener(new MessageListener() {

             

                @Override

                public void onMessage(Message message) {

                    try {

                        // send the message…

                        sub.close();

                    }

                    catch (JMSException e) {

                        e.printStackTrace();

                    }

                }

            });

            {code}

             

            It works, surprisingly, even if it looks like a typical dead-lock according to the JMS documentation (I guess the underlying implementation is using reentrant locks). But, it could solve my problem so is it by any chance spec compliant (or at least HornetQ compliant)?

             

            Another option would be to use a new thread to send the response and to call sub.close() from this new thread:

             

            {code}

            sub.setMessageListener(new MessageListener() {

             

                @Override

                public void onMessage(Message message) {

                    try {

                           messages.add(message); // store the message in a global array…

                           // start a new thread for the response if it wasn't started already.

                    }

                    catch (Exception e) {

                        e.printStackTrace();

                    }

                }

            });

            {code}

             

            ..and in this new thread:

             

            {code}

            sub.close();

             

            // Use an async HTTP context to send the message(s) back to the client.

            {code}

             

            Here, even if there is a second call of the onMessage() method before the new thread is started, the sub.close() call will block and there is no risk to loose a message. The drawback is clearly that I would need to manage a separated thread pool, but I can do it.

             

            If I could get a clear idea about the possible options, I could certainly solve the all problem (including the concurrent timeout event issue).

             

            Many thanks for your time.