5 Replies Latest reply on Jun 21, 2011 12:23 PM by grgptch

    core api, removeMessages

    grgptch

      I am experienceing a weird problem when deleting a message from the queue using core api. Namely, i want to delete a random message from the queue by providing message id, but the first message in the queue (head) always gets deleted? I use management queue/session and the code snippet is like this:

       

      messageId - message id of the message read from the queue

      address - "queue.test"

       

      ClientMessage message = session.createMessage(false);

      String resource = ResourceNames.CORE_QUEUE + address;

      ManagementHelper.putOperationInvocation(message, resource, "removeMessage", messageId);

      ClientMessage reply = requestor.request(message, 2000);

      if (reply != null) {

      ..... etc

       

      i have tested "removeMessages" and "messageCount" in the same fashion and both work as expected. Can someone pls provide an input what may be wrong?

       

      Thanks.

        • 1. Re: core api, removeMessages
          grgptch

          i was just looking into jms management example ManagementExample.java provided and I saw that message is removed in the following fashion without being read previously:

           

          JMSManagementHelper.putOperationInvocation(m,

                                                              "jms.queue.exampleQueue",

                                                              "removeMessage",

                                                              message.getJMSMessageID());

           

           

          message.getJMSMessageID() in the example gives null and I guess the first mesage gets deleted anyway. Is this the design decision to always delete messages from head or it should be possible to delete any arbitray message given its id?

           

          Perhaps I should use QueueQontrol directly? Can someone please shed some light.

           

          Many Thanks.

          g.

          • 2. Re: core api, removeMessages
            clebert.suconic

            I don't see that behaviour happening. Can you provide a test that is runnable?

            • 3. Re: core api, removeMessages
              grgptch

              Hi Clebert, thanks for responding. Here is the simplified code. messageCount works nicely, while message removal reports success. however when i traverse through the queue, message #3 is still there, while message at the queue head gets deleted. <management-address>hornetq.management</management-address> is defined, as well as <permission type="manage" roles="guest"/>.

               

              public class DeleteMessageTest {

               

                        public static void main(String[] args) {

               

                                  TransportConfiguration transportConfig = null;

                                  ServerLocator serverLocator = null;

                                  ClientSessionFactory factory = null;

               

                                  try {

                   String host = "192.168.1.67";

                   String port = "5445";

                   //String queueName = "test.queue";

                   String queueName = "fiji.queue";

               

                   Map<String, Object> params = new HashMap<String, Object>();

                   params.put(TransportConstants.HOST_PROP_NAME, host);

                   params.put(TransportConstants.PORT_PROP_NAME, port);

                   transportConfig = new TransportConfiguration(

                                                                                NettyConnectorFactory.class.getName(), params);

                               

                   serverLocator = HornetQClient.createServerLocatorWithoutHA(transportConfig);

                   serverLocator.setBlockOnDurableSend(true);

                   factory = serverLocator.createSessionFactory();

               

               

                   // create session/producer

                   ClientSession session = factory.createSession();

                   session.start();

                   ClientProducer producer = session.createProducer(queueName);

               

                   // send few messages

                   ClientMessage message = session.createMessage(true);

                   for (int i = 1; i <= 5; i++) {

                      message.getBodyBuffer().writeString("Message" + i);

                      producer.send(message);

                      message.getBodyBuffer().clear();

                   }

                   session.close();

               

                   // Create new session/consumer

                   ClientSession sess = factory.createSession();

                   sess.start();

                   ClientConsumer consumer = sess.createConsumer(queueName);

                   ClientMessage recv;

                   int num = 0;

                   long messageId = 0;

                   while ((recv = consumer.receive(3000)) != null) {

                      System.out.println(recv.getMessageID() + " " + recv.getBodyBuffer().readString());

                      num++;

                      if (num == 3)

                         messageId = recv.getMessageID();

                   }

                   sess.close();

               

                   // create management session

                   String resource = ResourceNames.CORE_QUEUE + queueName;

                   ClientSession mngtSess = factory.createSession();

                   mngtSess.start();

                   ClientRequestor requestor = new ClientRequestor(mngtSess, "hornetq.management");

               

                   // get message count

                   ClientMessage m = session.createMessage(false);

                   ManagementHelper.putAttribute(m, resource, "messageCount");

                   ClientMessage reply = requestor.request(m, 3000);

                   if (ManagementHelper.hasOperationSucceeded(reply)) {

                       int count = (Integer) ManagementHelper.getResult(reply);

                       System.out.println("msq count = " + count);

                   }

                               

                   // delete message

                   ClientMessage m1 = session.createMessage(false);

                   ManagementHelper.putOperationInvocation(m1, resource, "removeMessage", messageId);

                   reply = requestor.request(m1, 3000);

                   if (reply != null) {

                      if (ManagementHelper.hasOperationSucceeded(reply)) {

                         System.out.println("deleted messageId = " + messageId);

                      }

                   }

                   mngtSess.close();

               

                   // clean up

                   factory.close();

                   serverLocator.close();

                                  }

                                  catch (Exception e) {

               

                                  }

              }

              • 4. Re: core api, removeMessages
                clebert.suconic

                - First: I said *runnable* test ;-), so I had to spend my time on writing one.

                 

                - Second, your test is invalid. You are holding messages in delivering state on the consumer. You will not be able to delete messages that are in delivering state. The ACK is probably happening through your regular consumer.

                 

                 

                The deleteMessage is only acking / deleting the message you passed with a parameter.

                 

                 

                If you want to verify this, get our codebase, look for QueueControlTest, and add this following test there:

                 

                   public void testRemoveMessage2() throws Exception

                   {

                      SimpleString address = RandomUtil.randomSimpleString();

                      SimpleString queue = RandomUtil.randomSimpleString();

                 

                      session.createQueue(address, queue, null, false);

                      ClientProducer producer = session.createProducer(address);

                 

                      // send 2 messages on queue

                     

                      for (int i = 0 ; i < 100; i++)

                      {

                        

                         ClientMessage msg = session.createMessage(false);

                         msg.putIntProperty("count", i);

                         producer.send(msg);

                      }

                     

                      ClientConsumer cons = session.createConsumer(queue);

                      session.start();

                      LinkedList<ClientMessage> msgs = new LinkedList<ClientMessage>();

                      for (int i = 0; i < 50; i++)

                      {

                         ClientMessage msg = cons.receive(1000);

                         msgs.add(msg);

                      }

                 

                      QueueControl queueControl = createManagementControl(address, queue);

                      Assert.assertEquals(100, queueControl.getMessageCount());

                 

                      // the message IDs are set on the server

                      Map<String, Object>[] messages = queueControl.listMessages(null);

                      Assert.assertEquals(50, messages.length);

                      assertEquals(50, ((Integer)messages[0].get("count")).intValue());

                      long messageID = (Long)messages[0].get("messageID");

                 

                      // delete 1st message

                      boolean deleted = queueControl.removeMessage(messageID);

                      Assert.assertTrue(deleted);

                      Assert.assertEquals(99, queueControl.getMessageCount());

                     

                      cons.close();

                 

                      // check there is a single message to consume from queue

                      ManagementTestBase.consumeMessages(99, session, queue);

                 

                      session.deleteQueue(queue);

                   }

                • 5. Re: core api, removeMessages
                  grgptch

                  this was the best i could come up at 1 am

                   

                  Thanks, much appreciated!