0 Replies Latest reply on Jul 7, 2011 10:07 PM by jis2154

    JBoss 5.1 - Waiting MDBs prevent message delivery

    jis2154

      Summary:

      I'm using Message Drivern Beans that listen to a JMS queue or topic. These beans can place their threads in a waiting state for a period of time during execution of their onMessage methods. If a certain number of them are simultaneously waiting (between 30 and 50 - usually around 45), no additional JMS messages sent to the relevant queue or topic will be delivered until at least one of the waiting MDB instances times out. After the timeouts occur, the beans resume and their onMessage methods exit. The messages that were stuck in the queue are then delivered.

       

      This doesn't seem to be an MDB pool or queue size issue. I've configured the MDB pool's maximum size and the topic/queue message buffer limit to be much, much larger than the number at which the system gets stuck.

       

      Detailed Explanation:

      I believe the way these MDBs are being used is unconventional, and may introduce scaling problems. I haven't identified any reason it should fail in the way it does, however.

       

      These Beans are essentially using JMS messages to make multiple parallel recursive calls to themselves (calls to other instances of the same Bean, made by placing a message on the the same queue/topic the current message came from). The Beans wait until they receive results from all of these recursive calls (the results being received by a listener on another JMS topic). The Beans then perform additional sequential work based on the recursive calls' results before returning their own results.

       

      Here's a highly-abstracted version of what's happening in the onMessage methods:

       

      @MessageDriven(name = "MyBeanMDB", activationConfig = {
                       @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),        
                       @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/queue.MyBeanTaskQueue"),
                       @ActivationConfigProperty(propertyName = "providerAdapterJNDI", propertyValue = "java:/DefaultJMSProvider") })
      MyBean {
      
           public void onMessage(Message message) {
                //The message specifies a complex data analysis task that is to be performed.
      
                //Part of this task needs to be performed sequentially. 
                //Zero or more results be generated by performing this work.
                Collection<ImmediateWorkResult> immediateWorkResults = doImmediateWork(message);     
                
                //Create a list to hold the results of any child tasks that must be performed.
                List<MyBeanResult> childTaskResultList = new ArrayList<MyBeanResult>(immediateWorkResults.size());
      
                if (immediateWorkResults.size() > 0) {
      
                     //Create a latch that will be used to determine when all additional work is complete.
                     CountDownLatch childTaskCompletionLatch = new CountDownLatch(immediateWorkResults.size());
                
                     //The same work that was done on the original message needs to be done on each result.
                     //These tasks should be done in parallel.
                     for (ImmediateWorkResult childTask : immediateWorkResults) {
      
                          //Get a unique id for the JMS message.
                          UUID newTaskID = UUID.randomUUID();
                     
                          //Add a listener to the result JMS topic.
                          //When a result for the task uniquely identified by "newTaskID" is received,
                          //the result will be added to childTaskResultList and childTaskCompletionLatch will be decremented. 
                          addListenerToMyBeanResultTopic(newTaskID, childTaskCompletionLatch, childTaskResultList);
                     
                          //Send a JMS message describing the work to be done for the child task.
                          //This message should be received and handled by a different instance of MyBean.
                          sendJMSTaskMessageToMyBeanTaskQueue(newMessageUUID, result);
                     }
                     
                     try {
      
                          //Place the current thread in a waiting state until either all child task results are received or
                          //the timeout is reached.
                          //These tasks should only take a few seconds given the current test data set, but a very long timeout
                          //is currently being used in order to ensure this is not the cause of the problem.                    
                          if (!childTaskCompletionLatch.await(600000, TimeUnit.MILLISECONDS)) {
                               
                               //Publish a result indicating that this task has failed due to the timeout being exceeded, then return.                
                               MyBeanResult failureResult = createNewFailureResult(new TimeoutException());
                               sendJMSResultMessageToMyBeanResultTopic(failureResult);
                               return;
                          }
                     } catch ( InterruptedException ex ) {
                        
                          //Publish a result indicating that this task has failed due to the interruption, then return.
                          MyBeanResult failureResult = createNewFailureResult(ex);
                          sendJMSResultMessageToMyBeanResultTopic(failureResult);
                          return;
                     }
                }
      
                //Perform the remaining work based on the results of the work originally performed by this instance,
                //and the results of any child tasks.
                MyBeanResult completeResult = performFinalWorkOnAnyChildTaskResults(immediateWorkResults, childTaskResultList);
      
                //Publish the results before returning.
                sendJMSResultMessageToMyBeanResultTopic(completeResult);
           }
      }
      

       

       

      Can anyone explain this problem and suggest a way to correct it?