0 Replies Latest reply on Apr 9, 2007 2:07 PM by mainakb

    Multiple messages are getting lost in each queueReceiver.rec

    mainakb

      I am using JBOSS Messaging 1.0.1.GA. with JBOSS 4.0.5 GA and not using transacted session. I am trying to fetch messages from a pre-configured Queue in CLIENT_ACKNOWLEDGE_MODE. The queue had already a large number of message say 40000. Each time I use queueReceiver.receive(), multiple messages are getting lost.

      That is even before explict acknowledgement, the message count in JBOSS Queue decreases by 80/100.

      Please have a look at the following Snippet :

      public class Consumer {

      public static void main(String[] args)
      {
      QueueConnection qConnection =null;
      QueueSession queueSession=null;
      Queue queue=null;
      QueueReceiver queueReceiver=null;
      ObjectMessage message;
      try
      {
      qConnection=createConnection("queue/XMLQueue");
      queueSession = RequestSender.getQueueSessionClient(qConnection,"queue/XMLQueue");
      queue=createQueue("queue/XMLQueue");
      queueReceiver=queueSession.createReceiver(queue);

      for(int i=0;i<114;i++)
      {
      message=getNewMessage(qConnection,queueReceiver);
      }
      }
      catch(Exception e)
      {
      e.printStackTrace();
      }
      finally
      {
      if( qConnection != null )
      {
      try
      {
      // Closing the queueReceiver, queueSession & qConnection
      }
      catch(Exception jMSException)
      {
      jMSException.printStackTrace();
      }
      }
      }
      }
      public static ObjectMessage getNewMessage(QueueConnection qConnection,QueueReceiver queueReceiver)
      {
      ObjectMessage message=null;
      try
      {
      System.out.println("Just Before Receive -"+queueReceiver);
      qConnection.start();
      Message msg = queueReceiver.receive(1000);
      // Here we found more than one messages are being lost. We put a delay and monitored the Queue Depth.
      System.out.println("Just After Received -"+msg);
      qConnection.stop();
      if (msg instanceof ObjectMessage)
      {
      message = (ObjectMessage) msg;
      }

      }
      catch(JMSException jMSException)
      {
      jMSException.printStackTrace();
      }
      catch(Exception exception)
      {
      exception.printStackTrace();
      }
      return message;
      }
      public static QueueConnection createConnection()
      {
      QueueConnection queueConnection = null;
      // Creates a Queue Connection and return the session
      try
      {
      // Create a Queue Connection
      queueConnection = queueConnectionFactory.createQueueConnection();
      }
      catch(JMSException jMSException)
      {
      jMSException.printStackTrace();
      }
      return queueConnection;
      }
      public static QueueSession getQueueSessionClient(QueueConnection qConnection,String prefix) throws JMSException
      {
      QueueSession qSession = null;
      try
      {
      if( qConnection == null )
      {
      qConnection = createConnection();
      }
      if( null != qConnection )
      {
      qSession = qConnection.createQueueSession(false,Session.CLIENT_ACKNOWLEDGE);
      }
      }
      catch(JMSException jMSException)
      {
      throw jMSException;
      }
      return qSession;
      }
      public static Queue createQueue(String queueName) throws Exception
      {
      String queueName = null;
      Queue queue = null;
      try
      {
      if(queueNameQueueMap.containsKey(queueName))
      {
      queue=queueNameQueueMap.get(queueName);
      }
      else
      {
      Queue queueLookedUp = (Queue) jndiContext.lookup(queueName);
      queue=queueLookedUp;
      queueNameQueueMap.put(queueName,queueLookedUp);
      }
      }
      catch(NamingException namingException)
      {
      throw namingException;
      }
      return queue;
      }
      }