1 2 Previous Next 21 Replies Latest reply on Oct 21, 2009 7:05 PM by Andy Taylor

    RESTful Queue implementation

    Bill Burke Master

      The biggest problems I'm facing with developing a RESTful Queue interface is that HTTP is a connectionless protocol while HornetQ (and JMS) is a connection-based protocol.

      For example, take Queue consumers. In a "push" model, message acknowledgement is easy. The Queue Server does an HTTP POST with the message. If it receives a successful response, then it can acknowledge the message locally.

      For a "pull" model, there is an acknowledgement problem because HTTP is connectionless and REST is stateless (no client sessions). I thought of defining message state. So, when the client "pulling" for queued messages receives a message, it also receive a link to that message which it can then put into an acknowledgement state.

      I'm wondering how I should implement this? Should I create a session/consumer per message? Close the session/consumer when the message is acknowledged? or does HornetQ have a way to session-lessly acknowledge the message. Am I making sense at all?

      Bill

        • 1. Re: RESTful Queue implementation
          Jeff Mesnil Master

           

          "bill.burke@jboss.com" wrote:
          I thought of defining message state. So, when the client "pulling" for queued messages receives a message, it also receive a link to that message which it can then put into an acknowledgement state.


          a bit off-topic, wouldn't it be more RESTful to DELETE the message location to acknowledge it?

          "bill.burke@jboss.com" wrote:

          I'm wondering how I should implement this? Should I create a session/consumer per message? Close the session/consumer when the message is acknowledged? or does HornetQ have a way to session-lessly acknowledge the message. Am I making sense at all?


          Fiy, with HornetQ Core API, you can "pre-ack" the message (using ClientSessionFactory.setPreAcknowledge(true)) so that the message is acked on the server *before* it is sent to the client. But that's not what you want...

          Creating a session/consumer per message may not be a good idea: for every non-acked rest-messaging, the corresponding session/consumer will remain on the server (until they are cleaned up somehow)

          Creating a single session/consumer for your PollerResource will not work as you need independant message ack for each http clients.

          One possibility would be to create a Consumer resource on the server side that will be used by the HTTP clients to poll the messages. With this Consumer resource you could keep a buffer of HornetQ messages and ack them when the HTTP client modify the rest-messaging ack state.
          But this makes the workflow more complex, since the client will need to create a Consumer first and polls message from it rather than from the queue directly.

          hope that helps

          • 2. Re: RESTful Queue implementation
            Bill Burke Master

            Creating a consumer resource per client kind of breaks the stateless requirement of REST as you're basically defining a session.

            I guess my question was 2 things:

            * How much overhead is it to create a session/consumer per message (for queue polling). Specifically if this session/consumer is always going to be local to the VM. Don't you already do pooling of sessions for MDBs and such?
            * How much effort would it be to have an API into HornetQ to allow manually acknowledging a message without having to maintain a session/consumer to do the acknowledgement. Then a client is changing the state of the message itself (which in reality is what's happening).

            • 3. Re: RESTful Queue implementation
              Andy Taylor Master

               

              How much overhead is it to create a session/consumer per message (for queue polling). Specifically if this session/consumer is always going to be local to the VM. Don't you already do pooling of sessions for MDBs and such?


              Typically creating the connections and sessions has a large overhead as they are heavy weight objects. MDB's create their connections/sessions up front and keep them open until the MDB is undeployed. This is done via the JCA layer. Outgoing connections/sessions from the JCA layer are pooled as you say. Maybe since its going to be local, i.e. InVM, a connection/session pool might be something to think about, Jeff could probably answer this question better, i guess it depends on what the Restful Queue implementation Api is.

              * How much effort would it be to have an API into HornetQ to allow manually acknowledging a message without having to maintain a session/consumer to do the acknowledgement. Then a client is changing the state of the message itself (which in reality is what's happening).


              I'm not sure i 100% understand this, the messages are typically acknowledged via the consumers session. We do have the ability to pre acknowledge messages tho, this means that the server acks the message when it is delivered to the consumer, this does mean however that messages can be lost on failure. Maybe this fits the bill.


              • 4. Re: RESTful Queue implementation
                Jeff Mesnil Master

                 

                "bill.burke@jboss.com" wrote:

                * How much effort would it be to have an API into HornetQ to allow manually acknowledging a message without having to maintain a session/consumer to do the acknowledgement. Then a client is changing the state of the message itself (which in reality is what's happening).


                HornetQ does not expose its queues through its API.

                It seems to me what I'd need is to plug into HornetQ server directly to manipulate the org.hornetq.core.server.Queue interface.

                For example, a QueueResource could grab a ref to its queue (through HornetQServer.getPostOffice().getBinding()) and add a single consumer to the queue (using Queue.addConsumer()).
                When the HTTP client polls a message, it will delegate it to the consumer to get a message
                When the HTTP client DELETE the message, the QueueResource can call consumer.acknowledge(messageID) to ack the message.

                This is way more complex than using the HornetQ client API. But as this API is session-based, I am not sure how you can workaround having sesison state on the server if you use it...



                • 5. Re: RESTful Queue implementation
                  Andy Taylor Master

                   

                  This is way more complex than using the HornetQ client API. But as this API is session-based, I am not sure how you can workaround having sesison state on the server if you use it...


                  If we want to leverage the functionality in the core API we need to hold session state at the server side. now this could be a single (or set of pooled) connection/session that the RESTful resources talk to, i.e. the RESTful resources dont have any state per se but multiple resources could share some session state to avoid having to create connections/sessions etc for polling message, acknowledging messages and maybe other queue orientated functionality.

                  Doing it the other way as Jeff suggests, i.e. dealing with the queue directly is fine but narrows down what functionality can be offered (without having to implement it) and will also increase maintenance.

                  Have we decided what we want in the RESTful JMS API yet, maybe this would be a better place to start.

                  • 6. Re: RESTful Queue implementation
                    Bill Burke Master

                     

                    "ataylor" wrote:
                    This is way more complex than using the HornetQ client API. But as this API is session-based, I am not sure how you can workaround having sesison state on the server if you use it...


                    If we want to leverage the functionality in the core API we need to hold session state at the server side.


                    Session pooling will work, but doesn't solve the entire problem. What i would need would be:

                    * a set of "ready" consumers that have a message that that can be consumed.
                    * An HTTP message would come in from a consumer. A consumer would be pulled off the list of "ready" consumers.
                    * The consumer would be associated with the RESTful Message Resource.
                    * The client would receive the message with a link to the Message Resource. * The client would acknowledge the message by POSTing to the link to the Resource a representation that sets the state of the message to acknowledged
                    * The message would be acknowledge internally and the Consumer would be put back for further processing.

                    I guess this makes me think of another question. How much overhead is it to create a HornetQ consumer per HTTP request? What do you actually gain performance wise (or functionality wise) with keeping a consumer active? Especially since these consumer instances will be interacting with a purely InVM HornetQ.


                    Have we decided what we want in the RESTful JMS API yet, maybe this would be a better place to start.


                    I want to be able to support all that we can without breaking RESTful constraints. So, I really can't answer this question without knowing the implementation of HornetQ. I guess the question might be, which features *can't* be supported? What features require logic on the client to work?

                    • 7. Re: RESTful Queue implementation
                      Bill Burke Master

                       

                      "jmesnil" wrote:
                      "bill.burke@jboss.com" wrote:

                      * How much effort would it be to have an API into HornetQ to allow manually acknowledging a message without having to maintain a session/consumer to do the acknowledgement. Then a client is changing the state of the message itself (which in reality is what's happening).


                      HornetQ does not expose its queues through its API.

                      It seems to me what I'd need is to plug into HornetQ server directly to manipulate the org.hornetq.core.server.Queue interface.

                      For example, a QueueResource could grab a ref to its queue (through HornetQServer.getPostOffice().getBinding()) and add a single consumer to the queue (using Queue.addConsumer()).
                      When the HTTP client polls a message, it will delegate it to the consumer to get a message


                      Can a consumer receive() another message before it acknowledges another? This would pretty much solve the problem I'm having.


                      When the HTTP client DELETE the message, the QueueResource can call consumer.acknowledge(messageID) to ack the message.


                      DELETE isn't appropriate. What if the client doesn't want to acknowledge the message but put it in the queue? Also, what if we're archiving the message and want to view it later on? DELETE usually means removing the message. You'd be sort of redefining the meaning of the intent of HTTP delete.

                      BTW, I did in the past write almost a complete facade for JMS over HTTP and I pretty much did all the suggestions you guys posted recently. Create session resources, etc.


                      • 8. Re: RESTful Queue implementation
                        Jeff Mesnil Master

                         

                        "bill.burke@jboss.com" wrote:

                        Can a consumer receive() another message before it acknowledges another? This would pretty much solve the problem I'm having.


                        Yes, you can receive a message before acknowledging a previous one.
                        About messaging ack in HornetQ, when a consumer acknowledge a message, it also acknowledges all the previous messages received by this consumer.
                        I.e. you can receive a bunch of messages and ack only the last one to ack them all.

                        "bill.burke@jboss.com" wrote:

                        DELETE isn't appropriate. What if the client doesn't want to acknowledge the message but put it in the queue?


                        You're right, DELETE a message is not the right thing to do in that case.

                        "bill.burke@jboss.com" wrote:

                        Also, what if we're archiving the message and want to view it later on?


                        Do you expect the message to be archived by HornetQ?
                        Once a message has been acked, HornetQ will discard it as the consumer has taken responsibility for it. It is not HornetQ job to act as a store.



                        • 9. Re: RESTful Queue implementation
                          Andy Taylor Master

                           

                          I guess this makes me think of another question. How much overhead is it to create a HornetQ consumer per HTTP request? What do you actually gain performance wise (or functionality wise) with keeping a consumer active? Especially since these consumer instances will be interacting with a purely InVM HornetQ.


                          From a typical client consumer point of view, this is quite quick, 1 blocking send to create the consumer and 1 non blocking send to initiate delivery ( I say send even tho its local as we use an InVm transport). On the server side tho, a consumer is just an interface that is registered with a queue, it has a method 'HandleStatus handle(MessageReference reference)' which will keep accepting messages until it returns HandleStatus.BUSY. We already have multiple implementations of this, i.e. a bridge, so there nothing to say we cant implement another type of consumer whose functionality matches what you are looking for. Again this depends on whether we want to interact with the client API or the queue directly on the server.

                          • 10. Re: RESTful Queue implementation
                            Jeff Mesnil Master

                             

                            "jmesnil" wrote:

                            Yes, you can receive a message before acknowledging a previous one.
                            About messaging ack in HornetQ, when a consumer acknowledge a message, it also acknowledges all the previous messages received by this consumer.
                            I.e. you can receive a bunch of messages and ack only the last one to ack them all.


                            I was describing message ack for our ClientConsumer interface.

                            As Andy said, our server.Consumer interface just need to implement handle(MessageReference reference).
                            It is possible to provide an implementation where acking a message will ack only this message and no others...

                            sorry about the confusion

                            • 11. Re: RESTful Queue implementation
                              Jeff Mesnil Master

                              Bill, if that may help, here is a dumb skeleton to plug a ConsumerResource which holds a single message directly on the server

                              
                              import org.hornetq.core.server.Consumer;
                              import org.hornetq.core.server.MessageReference;
                              import org.hornetq.core.server.Queue;
                              import org.hornetq.core.filter.Filter;
                              
                              public class ConsumerResource implements Consumer
                              {
                              
                               private MessageReference current;
                               private Queue queue;
                              
                               public ConsumerResource(Queue queue) throws Exception
                               {
                               this.queue = queue;
                               queue.addConsumer(this);
                               }
                              
                               public Filter getFilter()
                               {
                               return null;
                               }
                              
                               public HandleStatus handle(MessageReference reference) throws Exception
                               {
                               //we keep 1 message at the time
                               if (current == null)
                               {
                               current = reference;
                               return HandleStatus.HANDLED;
                               } else {
                               // we do not accept the message if we already have one
                               return HandleStatus.BUSY;
                               }
                               }
                              
                               // method called by HTTP client to get the message
                               public Object getMessage()
                               {
                               if (current == null)
                               {
                               return 404;
                               } else
                               {
                               return current; // in a HTTP message
                               }
                               }
                              
                               // method called by HTTP client to ack the message
                               public void acknowledge(long messageID) throws Exception
                               {
                               if (messageID == current.getMessage().getMessageID())
                               {
                               queue.acknowledge(current);
                               current = null;
                               }
                               }
                              }
                              
                              



                              • 12. Re: RESTful Queue implementation
                                Tim Fox Master

                                If there seems to be a real requirement for HornetQ to support a session-less message delivery and consumption pattern, e.g. for REST, then although it would be possible to get this to work by creating consumers/sessions transiently for the production/delivery of single messages, I'd consider this to be a bit of a kludge, along with having poor performance implications as others have alluded to on this thread (sessions/consumers etc have setup overhead and should really be reused).

                                Instead, I think we should look at extending the HornetQ API to allow this kind of session-less operation to be supported directly.

                                I think we need something like this on the API:

                                Message getMessage(String queueName) - which gets the message (if available) at the top of the named queue outside the context of a session, and doesn't require a consumer (just removes the message from head of queue directly)

                                void acknowledge(String queueName, long messageID)- acknowledges all messages up to messageID in named queue

                                AIUI if we do the above that will limit us if we want to support transactions- if we don't have sessions - then how can we implement transactional production/consumption? Or perhaps the REST messaging interface is not going to support transactions?

                                • 13. Re: RESTful Queue implementation
                                  Tim Fox Master

                                  I'm going to assume we're taking a (cough) "pure" REST approach and not allowing transient session/transaction related resources to be created.

                                  This would certainly simplify things.

                                  • 14. Re: RESTful Queue implementation
                                    Tim Fox Master

                                    I think we're going to need to deal with timeouts too.

                                    In a session-less approach, if a client removes a message from a queue and consumes it, then crashes or exits without acknowledging that message, then _at some point_ that message should be put back on the queue so it becomes re-available for consumption by other clients.

                                    With a session-based model the point at which unacknowledged messages get returned to the queue is well defined - at session close or rollback.

                                    In the absence of sessions there is no well defined point at which this occur, so perhaps the server needs to automatically return unacknowledged messages to queues if they have not been acked within a certain timeout.

                                    How would this timeout be configured? Would we would allow clients to specify a timeout, or should it be implementation specific server side config, hidden from the client?

                                    1 2 Previous Next