10 Replies Latest reply on Aug 24, 2012 4:42 AM by s.goetz

    Messages don't get redelivered

    s.goetz

      Hi all,

       

      I am just starting to use hornetq and have no experience with mesage queuing in general. By the time hornetq is just fine running as a windows servive on my Windows 7 machine.

      But I am facing a strange behavior. With my configuration I intended to have infinite redeliveries and a delay of 5 seconds. But what I am actually having is something different.

      Here is my configuration file:

       

      <configuration xmlns="urn:hornetq"

                     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                     xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

       

       

         <paging-directory>${user.home}/.hornetq/${data.dir:data}/paging</paging-directory>

       

         <bindings-directory>${user.home}/.hornetq/${data.dir:data}/bindings</bindings-directory>

       

         <journal-directory>${user.home}/.hornetq/${data.dir:data}/journal</journal-directory>

       

         <journal-min-files>10</journal-min-files>

       

         <large-messages-directory>${user.home}/.hornetq/${data.dir:data}/large-messages</large-messages-directory>

       

         <connectors>

            <connector name="netty">

               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

               <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>

               <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>

            </connector>

       

            <connector name="netty-throughput">

               <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>

               <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>

               <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>

               <param key="batch-delay" value="50"/>

            </connector>

         </connectors>

       

       

         <acceptors>

            <acceptor name="netty">

               <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>

               <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>

               <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>

            </acceptor>

       

            <acceptor name="netty-throughput">

               <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>

               <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>

               <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>

               <param key="batch-delay" value="50"/>

               <param key="direct-deliver" value="false"/>

            </acceptor>

         </acceptors>

       

       

         <security-settings>

            <security-setting match="#">

                          <permission type="createDurableQueue" roles="admin"/>

                          <permission type="deleteDurableQueue" roles="admin"/>

                          <permission type="createTempQueue" roles="admin"/>

                          <permission type="deleteTempQueue" roles="admin"/>

                          <permission type="send" roles="admin"/>

                          <permission type="consume" roles="admin"/>

                          <permission type="manage" roles="admin"/>

            </security-setting>

         </security-settings>

       

       

         <address-settings>

            <!--default for catch all-->

            <address-setting match="#">

               <dead-letter-address>jms.queue.DLQ</dead-letter-address>

                           <expiry-address>jms.queue.ExpiryQueue</expiry-address>

                           <max-delivery-attempts>-1</max-delivery-attempts>

               <redelivery-delay>5000</redelivery-delay>

               <max-size-bytes>10485760</max-size-bytes>      

               <message-counter-history-day-limit>10</message-counter-history-day-limit>

               <address-full-policy>BLOCK</address-full-policy>

            </address-setting>

         </address-settings>

       

       

      </configuration>

       

      No the behabior is as follows. I start up my application server and create a consumer for a certain queue. This queue contains several messages already. So each of the messages is delivered once. Since I rollback the session after I have consumed a message it is put back into the queue.

      Here is the log listing the 21 messages' ids:

      FATAL 2012-08-23 17:07:37,237 [pool-16-thread-1][] root - 5954

      FATAL 2012-08-23 17:07:38,249 [pool-16-thread-1][] root - 5966

      FATAL 2012-08-23 17:07:39,252 [pool-16-thread-2][] root - 5984

      FATAL 2012-08-23 17:07:40,255 [pool-16-thread-1][] root - 6016

      FATAL 2012-08-23 17:07:41,257 [pool-16-thread-3][] root - 6048

      FATAL 2012-08-23 17:07:42,260 [pool-16-thread-3][] root - 6080

      FATAL 2012-08-23 17:07:43,263 [pool-16-thread-3][] root - 6112

      FATAL 2012-08-23 17:07:44,266 [pool-16-thread-3][] root - 6144

      FATAL 2012-08-23 17:07:45,268 [pool-16-thread-3][] root - 6176

      FATAL 2012-08-23 17:07:46,271 [pool-16-thread-3][] root - 6208

      FATAL 2012-08-23 17:07:47,274 [pool-16-thread-3][] root - 6240

      FATAL 2012-08-23 17:07:48,277 [pool-16-thread-3][] root - 6272

      FATAL 2012-08-23 17:07:49,279 [pool-16-thread-3][] root - 6304

      FATAL 2012-08-23 17:07:51,532 [pool-16-thread-3][] root - 6336

      FATAL 2012-08-23 17:08:01,295 [pool-16-thread-3][] root - 6368

      FATAL 2012-08-23 17:08:02,298 [pool-16-thread-3][] root - 6400

      FATAL 2012-08-23 17:08:03,301 [pool-16-thread-3][] root - 6432

      FATAL 2012-08-23 17:08:04,303 [pool-16-thread-3][] root - 6464

      FATAL 2012-08-23 17:08:05,305 [pool-16-thread-3][] root - 6496

      FATAL 2012-08-23 17:08:06,308 [pool-16-thread-3][] root - 6528

      FATAL 2012-08-23 17:08:07,311 [pool-16-thread-3][] root - 6560

       

      As I understand it, when rolled back, a message comes back on top of the queue and should be redelivered after the redelivery delay.

       

      Has anyone faced a similar behavior so far? For sure I am making some stupid newbie mistake...

       

      Regards,

       

      Sebastian

        • 1. Re: Messages don't get redelivered
          clebert.suconic

          When you have a redelivery-delay, you will have the queue re-delivered to the top at the queue when the time is achieved.

           

           

          There's another feature request on pausing the whole queue when a redelivery happens. That's a different feature.

           

          Now that you mentioned it.. I will bump its priority and make it to 2.3.0.

          • 2. Re: Messages don't get redelivered
            s.goetz

            So if I understand it right, while a message is in its redelivery delay, other messages in that queue can be redelivered.

            After that delay, the message is put back into the queue. Right?

             

            What I need is indeed a pausing so that the order of messages in the queue is maintained. How could I achieve this?

             

            Thanks,

             

            Sebastian

            • 3. Re: Messages don't get redelivered
              clebert.suconic

              At this point.. you could disable redelivery-delay.. and you could add some logic to your MDB... if deliveryCount > 0 (Sleep some time).

              • 4. Re: Messages don't get redelivered
                clebert.suconic

                So if I understand it right, while a message is in its redelivery delay, other messages in that queue can be redelivered.

                It was probably a typo in your phrase... but just to keep the terminology right.. to double check we understand each other...

                 

                 

                while the message is in its redelivery delay, other messages in that queue can be *delivered* fine.

                1 of 1 people found this helpful
                • 5. Re: Messages don't get redelivered
                  s.goetz

                  You're right, that was a typo indeed.

                   

                  But how to block delivery of other messages as long as there is a redelivery pending?

                  • 6. Re: Messages don't get redelivered
                    clebert.suconic

                    The message will be delivered to the top of the queue. If you have a single consumer the client's buffer is invalidated (refreshed) and we will get the message again from its top.

                     

                     

                    if you have more than one consumer, you could disable consumer-window-size, but the other consumers are free to receive.

                     

                    In your place I would either use a single consumer (configure the MDB to only have a single instance if you are using MDBs) or use message-grouping, which will guarantee the ordering at the producer's level.

                     

                    Now, if you whole queue is strictly ordered.. you probably need a single consumer.

                    • 7. Re: Messages don't get redelivered
                      s.goetz

                      Hmm that's strange.

                      I only have one consumer for that queue (see the JMX info):

                      Unbenannt.PNG

                      And I also found out about the message-grouping feature which I use also as said in the docs:

                       

                      Messages in a message group are always consumed by the same consumer, even if there are many consumers on a queue. They pin all messages with the same group id to the same consumer. If that consumer closes another consumer is chosen and will receive all messages with the same group id.

                       

                      I will show you some code maybe it clearer what I am doing with it:

                       

                             public void run()
                                {
                                          ClientMessage clientMessage = null;
                                          try
                                          {
                                                    // Make sure not to wait since this blocks the complete thread pool
                                                    while ((clientMessage = getConsumerSession().getConsumer().receive(-1)) != null)
                                                    {
                                                              LogMF.fatal(Logger.getRootLogger(), "ID={0}, Session={1}, Consumer={2}", new Object[] { clientMessage.getMessageID(), getConsumerSession().getSession(),
                                                                                  getConsumerSession().getConsumer() });
                                                              // sent message
                                                              UID uid = UID.parse(clientMessage.getBytesProperty(MessageQueueClient.MESSAGE_PROPERTY_DISPATCHER_UID));
                                                              IoSession ioSession = SessionLookupFilter.getSession(uid);
                      
                      
                                                              if ((ioSession != null) && ioSession.isConnected())
                                                              {
                                                                        BaseLTV ltv;
                                                                        try
                                                                        {
                                                                                  ltv = MessageQueueClient.clientMessageToLTV(clientMessage);
                                                                        }
                                                                        catch (Exception e)
                                                                        {
                                                                                  LogMF.error(log, e, "Unable to convert {0} from queue {1} into LTV. Abandoning message!", new Object[] { clientMessage, queueName });
                                                                                  // acknowledge message to get rid of queue blocker
                                                                                  clientMessage.acknowledge();
                      
                      
                                                                                  continue;
                                                                        }
                      
                      
                                                                        if (!checkAcknowleged(uid, clientMessage, ltv, ioSession, 0))
                                                                        {
                                                                                  ioSession.write(ltv);
                                                                                  LogMF.trace(log, "Dispatched message {0} for {1} to client {2}.", new Object[] { ltv, uid, ioSession.getRemoteAddress() });
                      
                      
                                                                                  // give some short possibility to immediately receive a client ack
                                                                                  if (!checkAcknowleged(uid, clientMessage, ltv, ioSession, 200))
                                                                                  {
                                                                                            // ATTENTION:
                                                                                            // Do not acknowledge now since we need to wait for client's low level/high level ack
                                                                                            // clientMessage.acknowledge();
                                                                                            getConsumerSession().getSession().rollback();
                                                                                  }
                                                                        }
                                                              }
                                                              else
                                                              {
                                                                        LogMF.debug(log, "Client with {0} currently disconnected.", new Object[] { uid });
                                                                        // don't acknowledge the message so we will get it again
                                                                        // FIXME: invite the client to establish a connection via SIP server
                                                                        // Set redelivery timeout of message to a serious value so we don't bother the client
                                                                        // with subsequent invitations.
                                                                        getConsumerSession().getSession().rollback();
                      
                      
                                                                        break;
                                                              }
                                                    }
                                          }
                                          catch (HornetQException e)
                                          {
                                                    log.error("Error while trying to read message from queue.", e);
                                          }
                                          catch (Exception e)
                                          {
                                                    LogMF.error(log, e, "Error during dispatching of mq-message {0} from queue {1} to client.", new Object[] { clientMessage, queueName });
                                          }
                                          finally
                                          {
                                                    messageQueueOutbox.schedule(this, messageQueueOutbox.getScheduleDelay(), TimeUnit.MILLISECONDS);
                                                    LogMF.trace(log, "Rescheduled {0} with {1} seconds delay.", new Object[] { this, messageQueueOutbox.getScheduleDelay() });
                                          }
                                }
                      
                      

                       

                      The above code belongs to a runnable that reschedules itself in a ScheduledThreadPoolExecutor. This runnable has a single instance of a ClientSession and one single ClientProducer for that session. As you can see I am rolling back the session in some cases. But the behavior is as posted initially.

                      Do I have to do something else after a rollback?

                      • 8. Re: Messages don't get redelivered
                        clebert.suconic

                        Are you reusing the session among other threads? A session should belong to a thread. You can switch context of the thread.. but you can't access it concurrently.  It's ok to pool it also.. you just have to keep the constraint of the session.

                         

                         

                        If you have a single consumer for the queue, and disable the redelivery-delay, the message will come to the top of the queue.

                        • 9. Re: Messages don't get redelivered
                          s.goetz

                          Dear Clebert,

                           

                          sorry but I was off yesterday (three little children and a wife complaining :-9). Its a mess with that timezine offsets.

                          In bed I thought of the problem again and again. Today I am going to write a simple test to ensure I have no problem with multi-threading.

                           

                          Thanks so far,

                           

                          Sebastian

                          • 10. Re: Messages don't get redelivered
                            s.goetz

                            Thanks to Clebert,

                             

                            the hint with the concurrency brought me onto the right road.

                            I had a mistake in my code in the end had me rollback the false session. After fixing that issue it works like a charm

                             

                            Greets,

                             

                            Sebastian