5 Replies Latest reply on Oct 27, 2005 6:14 PM by ovidiu.feodorov

    Consumer accumulating messages on the server

    ovidiu.feodorov



      Regarding the waiting list the ServerConsumerDelegate uses to accumulates messages while the corresponding connection is stopped: it accumulates messages, not references. This means we may find ourselves in a situation when the server runs out of memory when a producer keeps sending messages to a consumer with a stopped connection, or a consumer that doesn't receive().

      Why don't we use the same "subscription" concept you introduced for topics: a subscription is a pipe that gets plugged into a queue's router. The pipe has access to message store is it can comfortably store references. It has recoverability. It seems like the ideal candidate.

      Btw, you should have topic subscriptions use pipes and not queues, since they'll only have a single receiver, anyway.

        • 1. Re: Consumer accumulating messages on the server
          timfox

           

          "ovidiu.feodorov@jboss.com" wrote:


          Regarding the waiting list the ServerConsumerDelegate uses to accumulates messages while the corresponding connection is stopped: it accumulates messages, not references. This means we may find ourselves in a situation when the server runs out of memory when a producer keeps sending messages to a consumer with a stopped connection, or a consumer that doesn't receive().

          Why don't we use the same "subscription" concept you introduced for topics: a subscription is a pipe that gets plugged into a queue's router. The pipe has access to message store is it can comfortably store references. It has recoverability. It seems like the ideal candidate.

          "tim.fox@jboss.com" wrote:

          Yes. If the consumer is closed, then in ServerConsumerDelegate.handle we just refuse to accept the message. It's then stored in the subscription, and is redelivered when the consumer starts.


          Btw, you should have topic subscriptions use pipes and not queues, since they'll only have a single receiver, anyway.

          "tim.fox@jboss.com" wrote:

          Good point. I'll add a JIRA task



          • 2. Re: Consumer accumulating messages on the server
            ovidiu.feodorov

             

            If the consumer is closed, then in ServerConsumerDelegate.handle we just refuse to accept the message. It's then stored in the subscription, and is redelivered when the consumer starts.


            Once the consumer is closed, it cannot be re-started. You mean stoping()/starting() the connection, right?


            Actually, my idea was that pipe is managed by the queue and it is an integral part of it. For each Receiver that connects to the queue, the queue "allocates" a pipe wich acts as a output buffer for messages. The rationale for that is mainly that recovery after failure is much easier this way:

            If the queue crashes, the active deliveries will be recovered as part of the queue recovery process. On recovery, all active deliveries managed by "subscription pipes" will be read from database, turned back into messages and put into the queue. If the pipe sits on the consumer, then we need to recover those as well and recreate their state, which I would rather not do.

            Another reason is that the consumer with a pipe cannot just "cancel" a delivery and have the message automatically re-incarnate in the queue. The pipe has currently full responsibility for the message (it returned a "done' delivery to the queue and the queue forgot the message) so now the consumer has to resend the message to the queue.


            Both implementations are equivalent, I guess, it just seems to me the one I am describing is more logica. I may be wrong.

            I have this almost implemented in my work area, by the way. I will check in soon so you can look at them and tell me what you think.

            • 3. Re: Consumer accumulating messages on the server
              timfox

               

              "ovidiu.feodorov@jboss.com" wrote:
              If the consumer is closed, then in ServerConsumerDelegate.handle we just refuse to accept the message. It's then stored in the subscription, and is redelivered when the consumer starts.


              Errr. I did this yesterday. It's all comitted. All tests pass. No need for an extra pipe though. If the consumeris stopped(), we just return null from the handle call. The message then goes back to the channel.
              When the consumer is started we just call deliver().


              • 4. Re: Consumer accumulating messages on the server
                timfox

                 

                "timfox" wrote:
                "ovidiu.feodorov@jboss.com" wrote:
                If the consumer is closed, then in ServerConsumerDelegate.handle we just refuse to accept the message. It's then stored in the subscription, and is redelivered when the consumer starts.


                Errr. I did this yesterday. It's all comitted. All tests pass. No need for an extra pipe though. If the consumeris stopped(), we just return null from the handle call. The message then goes back to the channel.
                When the consumer is started we just call deliver().


                This was an (almost) trivial change.
                When the consumer is stopped(), ServerConsumerDelegate.handle() returns null.
                This means the message goes back into the channel state().
                When the consumer is started, we just call deliver() on the channel.
                I don't understand why we need extra pipes and it needs to be so complex?
                All tests pass.
                What am I missing here?


                • 5. Re: Consumer accumulating messages on the server
                  ovidiu.feodorov

                  No, you're right. Simpler the better. At first, I thought you were saying you replaced the waiting list with a pipe. Now, I understand.

                  No pipe. That's great.