I did a preliminary prototype of a "pull" model for a RESTful queue consumer.
Here's how it works:
1) There is no session created for a remote consumer.
2) Consumer clients poll on a specific URL with a POST, i.e. /queues/MyQueue/poller. POST is used because a POST changes the state of the server.
3) The client does a POST with a wait time. If there is a message available it is sent back to the client. The client also receives a link (through an HTTP header) for acknowledgement. I.e.: /queues/MyQueue/messages/333/state. This link also has ETag metadata associated with it.
4) When the client is ready to acknowledge the message it posts a Form (x-www-form-urlencoded) to the acknowledgement link telling the server to ack the message. This is a "Conditional POST" (defined by HTTP spec). An "If-Match" header should be sent by the client with the previously transmitted ETag value. The the etag (version) matches, then the message is acknowledged. Otherwise the client receives a "Preconditioned Failed" message from the server and must disregard the message. This behavior handles the case where the client does a late ack and the server as already requeued the message. You don't want a timed out client to ack a message that has been redelivered!
HOW IT IS IMPLEMENTED:
I don't think I could have implemented this with HornetQ as it was so I had to develop some code around HornetQ.
* The way I implemented is that the server, at boottime, creates N consumers with a MessageHandler.
* The MessageHandlers share a reference to an in-memory java.util.concurrent.LinkedBlockingQueue.
* When they receive a message, they post to this in-memory-queue.
* Each message posted has a "queuePolled" Latch. It awaits() forever on this latch.
* A remote client does an HTTP POST to the "poller" resource. This request polls the in-memory-queue. If there is a message available it notifies the Latch that it was pulled from the queue.
* The MessageHandler wakes up. It then waits for a specific timeout on a different "acknowledgement" latch.
* THe "poller" returns the message to the remote client. It also grabs a "message version" number from the message to return to the client.
* If the MessageHandler wakes up before the message is acknowledges, it re-enqueues the message in the in-memory queue. it also bumps the etag(version) of the message. In this scenario, if an old ack message comes in, it will be denied because the etags won't match.
* If the client is on time, it will HTTP POST an ack to the Message resource. This resource receives the post and notifies the MessageHandler that the message was acknowledged. The MessageHandler performs the *actual* acknowledgement.
CHANGES TO HORNETQ?
What I basically did was implement my own async queue consumer. I don't know if this is the right approach to take or HornetQ should change. It *is* pretty ligthweight. How could HornetQ support this though?
1) One ack per Message.acknowledge(). Currently HornetQ will acknowledge all previous message of a consumer
2) Allow the lookup of a Message by messageID. (would be needed for the ack message).
3) A Message should be redelivered after a timeout (not sure if this already happens)
4) If a message is redelivered, there should a counter for this (to support the conditional POST pattern I discussed before).
I hope this isn't too much to swallow.