-
1. Re: core api, removeMessages
grgptch Jun 21, 2011 12:42 AM (in response to grgptch)i was just looking into jms management example ManagementExample.java provided and I saw that message is removed in the following fashion without being read previously:
JMSManagementHelper.putOperationInvocation(m,
"jms.queue.exampleQueue",
"removeMessage",
message.getJMSMessageID());
message.getJMSMessageID() in the example gives null and I guess the first mesage gets deleted anyway. Is this the design decision to always delete messages from head or it should be possible to delete any arbitray message given its id?
Perhaps I should use QueueQontrol directly? Can someone please shed some light.
Many Thanks.
g.
-
2. Re: core api, removeMessages
clebert.suconic Jun 21, 2011 1:44 AM (in response to grgptch)I don't see that behaviour happening. Can you provide a test that is runnable?
-
3. Re: core api, removeMessages
grgptch Jun 21, 2011 3:53 AM (in response to clebert.suconic)Hi Clebert, thanks for responding. Here is the simplified code. messageCount works nicely, while message removal reports success. however when i traverse through the queue, message #3 is still there, while message at the queue head gets deleted. <management-address>hornetq.management</management-address> is defined, as well as <permission type="manage" roles="guest"/>.
public class DeleteMessageTest {
public static void main(String[] args) {
TransportConfiguration transportConfig = null;
ServerLocator serverLocator = null;
ClientSessionFactory factory = null;
try {
String host = "192.168.1.67";
String port = "5445";
//String queueName = "test.queue";
String queueName = "fiji.queue";
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.HOST_PROP_NAME, host);
params.put(TransportConstants.PORT_PROP_NAME, port);
transportConfig = new TransportConfiguration(
NettyConnectorFactory.class.getName(), params);
serverLocator = HornetQClient.createServerLocatorWithoutHA(transportConfig);
serverLocator.setBlockOnDurableSend(true);
factory = serverLocator.createSessionFactory();
// create session/producer
ClientSession session = factory.createSession();
session.start();
ClientProducer producer = session.createProducer(queueName);
// send few messages
ClientMessage message = session.createMessage(true);
for (int i = 1; i <= 5; i++) {
message.getBodyBuffer().writeString("Message" + i);
producer.send(message);
message.getBodyBuffer().clear();
}
session.close();
// Create new session/consumer
ClientSession sess = factory.createSession();
sess.start();
ClientConsumer consumer = sess.createConsumer(queueName);
ClientMessage recv;
int num = 0;
long messageId = 0;
while ((recv = consumer.receive(3000)) != null) {
System.out.println(recv.getMessageID() + " " + recv.getBodyBuffer().readString());
num++;
if (num == 3)
messageId = recv.getMessageID();
}
sess.close();
// create management session
String resource = ResourceNames.CORE_QUEUE + queueName;
ClientSession mngtSess = factory.createSession();
mngtSess.start();
ClientRequestor requestor = new ClientRequestor(mngtSess, "hornetq.management");
// get message count
ClientMessage m = session.createMessage(false);
ManagementHelper.putAttribute(m, resource, "messageCount");
ClientMessage reply = requestor.request(m, 3000);
if (ManagementHelper.hasOperationSucceeded(reply)) {
int count = (Integer) ManagementHelper.getResult(reply);
System.out.println("msq count = " + count);
}
// delete message
ClientMessage m1 = session.createMessage(false);
ManagementHelper.putOperationInvocation(m1, resource, "removeMessage", messageId);
reply = requestor.request(m1, 3000);
if (reply != null) {
if (ManagementHelper.hasOperationSucceeded(reply)) {
System.out.println("deleted messageId = " + messageId);
}
}
mngtSess.close();
// clean up
factory.close();
serverLocator.close();
}
catch (Exception e) {
}
}
-
4. Re: core api, removeMessages
clebert.suconic Jun 21, 2011 9:38 AM (in response to grgptch)- First: I said *runnable* test ;-), so I had to spend my time on writing one.
- Second, your test is invalid. You are holding messages in delivering state on the consumer. You will not be able to delete messages that are in delivering state. The ACK is probably happening through your regular consumer.
The deleteMessage is only acking / deleting the message you passed with a parameter.
If you want to verify this, get our codebase, look for QueueControlTest, and add this following test there:
public void testRemoveMessage2() throws Exception
{
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, queue, null, false);
ClientProducer producer = session.createProducer(address);
// send 2 messages on queue
for (int i = 0 ; i < 100; i++)
{
ClientMessage msg = session.createMessage(false);
msg.putIntProperty("count", i);
producer.send(msg);
}
ClientConsumer cons = session.createConsumer(queue);
session.start();
LinkedList<ClientMessage> msgs = new LinkedList<ClientMessage>();
for (int i = 0; i < 50; i++)
{
ClientMessage msg = cons.receive(1000);
msgs.add(msg);
}
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertEquals(100, queueControl.getMessageCount());
// the message IDs are set on the server
Map<String, Object>[] messages = queueControl.listMessages(null);
Assert.assertEquals(50, messages.length);
assertEquals(50, ((Integer)messages[0].get("count")).intValue());
long messageID = (Long)messages[0].get("messageID");
// delete 1st message
boolean deleted = queueControl.removeMessage(messageID);
Assert.assertTrue(deleted);
Assert.assertEquals(99, queueControl.getMessageCount());
cons.close();
// check there is a single message to consume from queue
ManagementTestBase.consumeMessages(99, session, queue);
session.deleteQueue(queue);
}
-
5. Re: core api, removeMessages
grgptch Jun 21, 2011 12:23 PM (in response to clebert.suconic)this was the best i could come up at 1 am
Thanks, much appreciated!