1 Reply Latest reply on Nov 24, 2004 12:51 PM by Adrian Brock

    Repeatable deadlock when closing TopicSubscriber

    Max Ross Newbie

      The code pasted below contains a jms test that exposes a race condition vulnerability in JBoss 3.2.6. I've stripped this test down as far as possible to let the important pieces stand out, so yes, I know, I'm not cleaning things up properly.

      In order to understand what is happening, you?ll also want to look at SpyMessageConsumer.java and SpySession.java.

      In the test case I establish 2 topic subscriptions and 2 topic publishers. I publish messages to both topics one right after the other, so that they are delivered at almost the exact same time. The onMessage handler attempts to close both subscriptions as soon as it receives a message (ignore the fact that we would never want to do this on every message). It succeeds in closing the first subscription, but hangs attempting to close the second subscription.

      Here?s why it hangs:

      First message is delivered to the SpyMessageConsumer (M1).

      M1 SpyMessageConsumer line 678: obtainDeliveryLock() is invoked on the session member (instance of SpySession). The lock is immediately granted because this is the first message received (no contention for the lock).

      M1 SpySession line 1088: inDelivery == false, so inDelivery is immediately set to true and the method returns.

      Second message is delivered to the SpyMessageConsumer(M2).

      M2 SpyMessageConsumer line 678: obtainDeliveryLock() is invoked on the session member (instance of SpySession). The lock is granted as soon as M1 leaves the synchronized block within the obtainDeliveryLock method.

      M2 SpySession line 1092: inDelivery == true, so the while loop starts and wait() is invoked on the deliveryLock. M2 is now waiting for notification on the deliveryLock.

      M1 SpyMessageConsumer line 682: onMessage() is invoked on the listener (instance of our JMSMain class).

      M1 JMSMain line 61: close() is invoked on the first topic subscription (instance of SpyMessageConsumer).

      M1 SpyMessageConsumer line 551: The current thread is equal to the listener thread for this subscription, so the if condition fails. Method exits normally. Subscription closed successfully.

      M1 JMSMain line 61: close() is invoked on the second topic subscription (instance of SpyMessageConsumer).

      M1 SpyMessageConsumer line 551: The current thread is not equal to the listener thread for this subscription, so the if condition succeeds.

      M1 SpyMessageConsumer line 557: join() is invoked on the listener thread with no arguments, meaning wait indefinitely for the listener thread to exit.

      Once we reach this state, M2 is waiting for notification on the deliveryLock, but M1 can't release the deliveryLock until M2 stops executing. Deadlock. The JVM doesn't catch the deadlock because the two threads are not explicitly waiting on each other. Rather, the first thread is waiting for notification on a lock it does not own, and the second thread is unable to provide the notification because it is waiting for the first thread to die.

      Now, before I post the thread dump and the test code, I want to ask a question. The delivery lock mechanism was added to "Simulate the single thread per session to avoid concurrent delivery to a message listener,
      as required by the spec." (taken from the CVS commit comment). In my scenario, I'm closing a subscription from within the onMessage method of a listener, and that subscription is not the subscription under which the message I'm processing was delivered. I've already recognized that there is a much cleaner way to get the behavior for which I'm looking, but does my code actually wander into territory explicitly deemed undefined or illegal by the spec? Either way, it would certainly be nice if JBoss could do something other than deadlock if a programmer is foolish enough to tie him or herself in this knot.

      Thanks in advance,
      max

      Thread Dump:

      "MessageListenerThread - AuditSystemMonitorAlarmTopic" prio=5 tid=0x18560498 nid=0x2574 in Object.wait() [18f4f000..18f4fd88]
      at java.lang.Object.wait(Native Method)
      - waiting on <0x105f6f90> (a java.lang.Object)
      at java.lang.Object.wait(Object.java:429)
      at org.jboss.mq.SpySession.obtainDeliveryLock(SpySession.java:1092)
      - locked <0x105f6f90> (a java.lang.Object)
      at org.jboss.mq.SpyMessageConsumer.run(SpyMessageConsumer.java:678)
      at java.lang.Thread.run(Thread.java:534)

      "MessageListenerThread - AuditSystemMonitorTopic" prio=5 tid=0x18566620 nid=0x2664 in Object.wait() [18f0f000..18f0fd88]
      at java.lang.Object.wait(Native Method)
      - waiting on <0x105f9c70> (a java.lang.Thread)
      at java.lang.Thread.join(Thread.java:1001)
      - locked <0x105f9c70> (a java.lang.Thread)
      at java.lang.Thread.join(Thread.java:1054)
      at org.jboss.mq.SpyMessageConsumer.close(SpyMessageConsumer.java:557)
      at com.testcase.JMSMain.onMessage(JMSMain.java:61)
      at org.jboss.mq.SpyMessageConsumer.run(SpyMessageConsumer.java:682)
      at java.lang.Thread.run(Thread.java:534)

      And the test case (I've replaced Bracket i Bracket with []i[] because I can't get BBCode to display it properly):

      package com.testcase;
      
      import java.util.Hashtable;
      
      import javax.jms.ConnectionFactory;
      import javax.jms.JMSException;
      import javax.jms.Message;
      import javax.jms.MessageListener;
      import javax.jms.Session;
      import javax.jms.Topic;
      import javax.jms.TopicConnection;
      import javax.jms.TopicConnectionFactory;
      import javax.jms.TopicPublisher;
      import javax.jms.TopicSession;
      import javax.jms.TopicSubscriber;
      import javax.naming.Context;
      import javax.naming.InitialContext;
      import javax.naming.NamingException;
      
      public class JMSMain implements MessageListener
      {
       public static void main(String[] args) throws Exception
       {
       final JMSMain jmsTest = new JMSMain();
       jmsTest.proceedToDeadlock();
       }
      
       public void proceedToDeadlock() throws Exception
       {
       try
       {
       establishContext();
       establishTopicConnection();
      
       for(int i = 0; i < _topicNames.length; i++)
       {
       _topicSubscribers[]i[] = establishSubscriber(_topicNames[]i[]);
       _topicPublishers []i[] = establishPublisher(_topicNames[]i[]);
       }
      
       for(int i = 0; i < _topicNames.length; i++)
       {
       _topicPublishers []i[].publish(_topicSession.createTextMessage("meaningless" + i));
       }
       }
       finally
       {
       cleanupResources();
       }
       }
       public void onMessage(Message mes)
       {
       System.out.println("Received message " + mes.toString());
       try
       {
       Thread.sleep(5000); // makes the race condition more repeatable
       for(int i = 0; i < _topicSubscribers.length; i++)
       {
       System.out.println("Preparing to close subscription " + i);
       _topicSubscribers[]i[].close();
       System.out.println("Subscription " + i + " closed.");
       }
       }
       catch (Exception e)
       {
       }
       }
      
       private static void cleanupResources()
       {
       for(int i = 0; i < _topicPublishers.length; i++)
       {
       try
       {
       _topicPublishers[]i[].close();
       }
       catch (JMSException jmse)
       {
       }
       }
       try
       {
       _con.close();
       }
       catch (NamingException ne)
       {
       }
       }
      
       private static void establishTopicConnection() throws NamingException, JMSException
       {
       ConnectionFactory fac = (ConnectionFactory)_con.lookup("UIL2ConnectionFactory");
       _topicConnection = ((TopicConnectionFactory)fac).createTopicConnection();
       _topicSession = _topicConnection.createTopicSession(false, Session.CLIENT_ACKNOWLEDGE);
       _topicConnection.start();
       }
      
       private static void establishContext() throws NamingException
       {
       Hashtable env = new Hashtable();
       env.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
       env.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
       env.put(Context.PROVIDER_URL, "jnp://" + jmsProvider + ":" + jmsProviderPort);
       _con = new InitialContext(env);
       }
      
       private TopicSubscriber establishSubscriber(String topicJndiName) throws Exception
       {
       Topic topic = (Topic)_con.lookup(topicJndiName);
       TopicSubscriber sub = _topicSession.createSubscriber(topic);
       sub.setMessageListener(this);
       return sub;
       }
      
       private TopicPublisher establishPublisher(String topicJndiName) throws Exception
       {
       Topic topic = (Topic)_con.lookup(topicJndiName);
       return _topicSession.createPublisher(topic);
       }
      
       private static Context _con = null;
       private static TopicConnection _topicConnection = null;
       private static TopicSession _topicSession = null;
       private static final String[] _topicNames = {"topic/AuditSystemMonitorTopic", "topic/AuditSystemMonitorAlarmTopic"};
       private static TopicSubscriber[] _topicSubscribers = new TopicSubscriber[_topicNames.length];
       private static TopicPublisher[] _topicPublishers = new TopicPublisher[_topicNames.length];
       private static final String jmsProvider = "localhost";
       private static final String jmsProviderPort = "1399";
      
      }