11 Replies Latest reply on Aug 23, 2012 11:12 AM by Sebastian Götz

    HornetQ as message buffer

    Sebastian Götz Newbie

      Dear experts,

       

      I just have startet using hornetQ an lack experience with message queues in general. So please have a little patience with me if I don not get something...

       

      OK, let me tell you what I am trying to do. I have devloped a processing unit for messages coming in from external, tcp-connected devices. I intend to use hornetQ as some kind of message box. There is only one global 'inbox' queue for all devices. As a consumer to this inbox queue I have set up a thread pool of workers, each having its own hornetQ ClientSession. When my IO-layer receives a message from a device it checks syntax and puts it in the inbox-queue. One of the threads receives the messages and performs processing and after that acknowledges the message. So far the easy part :-)

      Now there is also an outbox queue or more precisely there is an outbox queue for each and every distinct device. When a processor of an inbox message works on a message, it in turn produces response messages for the device. These response messages go into the outbox queue of that device. So more formaly the processor workers are consumers to the inbox queue and producers to the outbox queues. As a consumer to the outbox queue I have also set up a thread pool of workers each with an own ClientSession. Each of theses workers iterates over the available queues and trys to receive a message with a relatively short timeout. If it gets a message it uses the IO-layer to transport the message to the correct device. Here is the difficulty: the device's protocol says that a message sent to a device may only be deleted by the sender after an acknowledgement is send by the device. So the outbox worker cannot acknowledge the hornetq message right away. I have made a picture to illustrate what I try to explain:

       

      Unbenannt.PNG

      You can treat the 'CommTask' and 'dev' icons as the devices in my explanation.

       

      Now that's my actual question: How can I acknowledge messages with delay?

      Maybe I have chosen the wrong approach and somone has a better view to such scenarios. Any hint is welcome.

       

      Thanks in advance,

       

      Sebastian

        • 1. Re: HornetQ as message buffer
          Justin Bertram Master

          Each of theses workers iterates over the available queues and trys to receive a message with a relatively short timeout. If it gets a message it uses the IO-layer to transport the message to the correct device. Here is the difficulty: the device's protocol says that a message sent to a device may only be deleted by the sender after an acknowledgement is send by the device. So the outbox worker cannot acknowledge the hornetq message right away.

          When the "worker" sends a message to the device does it wait for a reply from it?  If so you can just ack the JMS message when you get the ack from the device.

           

          Also, I'm curious why you are managing your own pools of threads rather than using MDBs which basically provide that kind of pooling for free.

           

          Lastly, please note that a org.hornetq.api.core.client.ClientSession is single-threaded and therefore should not be used by multiple threads concurrently.

          • 2. Re: HornetQ as message buffer
            Sebastian Götz Newbie

            Hi Justin,

             

            yes the worker waits for a reply from the device. Since I cannot forsay, how long it will take until that answer arrives, the worker cannot block holding the reference to the ClientMessage but has to continue to work. Otherwise I would have convoy effects when many devices are connected. The process of dispatching to the device is actually complex.

             

            What do you mean with MDB?

             

            I just read that ClientSession must not be shared between threads and chose a ThreadLocal approach to manage a per thread session.

             

            Regards,

             

            Sebastian

            • 3. Re: HornetQ as message buffer
              Justin Bertram Master

              yes the worker waits for a reply from the device. Since I cannot forsay, how long it will take until that answer arrives, the worker cannot block holding the reference to the ClientMessage but has to continue to work. Otherwise I would have convoy effects when many devices are connected.

              If you cannot wait for a response from the device then how can you ever reliably know if the device actually got the message and therefore ack the JMS message in turn?  It seems to me that you will either have to tune your system so that you can wait for the response from the device without having "convoy effects" or redeliver the message to the device until you get a response in a timely fashion.

               

              What do you mean with MDB?

              I mean use an MDB (which uses a JMS session pool and an MDB instance pool) to consume the messages from the "inbox" rather than managing the threads yourself.

              • 4. Re: HornetQ as message buffer
                Sebastian Götz Newbie

                There is much indirection in thei host - device - connection. Actually i use Apache Mina as IO-layer which handles most of the networking stuff. So it is not as simple as putting a message on the wire and waiting for the response and upon that acknowledge the message.

                But I have no idea of how to do it for my scenario. For me the io-layer is a black box I pass a message in. And on the inbox worker a response to that message will pop out. So it is not the sender of the message that will actually remove the message from the queue.

                • 5. Re: HornetQ as message buffer
                  Justin Bertram Master

                  So you have this requirement:

                  the device's protocol says that a message sent to a device may only be deleted by the sender after an acknowledgement is send by the device.

                   

                  But this is also true:

                  There is much indirection in thei host - device - connection. Actually i use Apache Mina as IO-layer which handles most of the networking stuff. So it is not as simple as putting a message on the wire and waiting for the response and upon that acknowledge the message.

                   

                  It looks to me like the fundamental design of your system does not support your requirements.

                  • 6. Re: HornetQ as message buffer
                    Clebert Suconic Master

                    Sebastian: there are two forums on HornetQ. One for development and one for users. This one is for development so I will be moving your message over..

                     

                    Please use the user forums for questions. users are welcome to talk on the dev forums when they are doing any development activity (such as contributions).

                    • 7. Re: HornetQ as message buffer
                      Sebastian Götz Newbie

                      Hello Clebert,

                       

                      that was a misunderstanding. I thought of developers using hornetQ and not of developers developing it

                      • 8. Re: HornetQ as message buffer
                        Sebastian Götz Newbie

                        I have another question:

                         

                        Since the device I add a message to the outbox queue for may not be connected, I want the outbox for that device to 'freeze' until it is connected. I thought of redelivery delay to handle that. But now I am facing a problem, that my messages when not acknowledged are redelivered with a very long delay of several minutes. Maybe I am doing something wrong. Here are the address settings of the server:

                        {

                        "DLA":"",

                        "maxSizeBytes":-1,

                        "pageCacheMaxSize":5,

                        "lastValueQueue":false,

                        "expiryAddress":"",

                        "addressFullMessagePolicy":"PAGE",

                        "redistributionDelay":-1,

                        "redeliveryDelay":5000,

                        "pageSizeBytes":10485760,

                        "sendToDLAOnNoRoute":false,

                        "maxDeliveryAttempts":0

                        }

                         

                        And here is the code of my outbox worker:

                        while ((clientMessage = messageQueueServer.getConsumer(queueName).receive(RECEIVE_TIMEOUT)) != null)

                        {

                                  // if client has sent low level acknowledgment => finally remove message

                                  if (MessageQueueOutbox.isAcknowledged(clientMessage.getMessageID()))

                                  {

                                       // drop the transmission log entry

                                       MessageQueueOutbox.removeAcknowledgment(clientMessage.getMessageID());

                                       // acknowledge the message so it gets deleted from the queue

                                       clientMessage.acknowledge();

                                  }

                                  // no acknowledgment yet => resent message

                                  else

                                  {

                                       UID uid = UID.parse(clientMessage.getBytesProperty(MessageQueueServer.MESSAGE_PROPERTY_UID));

                                       IoSession ioSession = SessionLookupFilter.getSession(uid);

                         

                         

                                       if ((ioSession != null) && ioSession.isConnected())

                                       {

                                                 BaseLTV ltv;

                                                 try

                                                 {

                                                       ltv = MessageQueueServer.clientMessageToLTV(clientMessage);

                                                 }

                                                 catch (Exception e)

                                                 {

                                                      LogMF.error(log, e, "Unable to convert {0} from queue {1} into LTV. Abandoning message!", new Object[] { clientMessage, queueName });

                                                      // acknowledge message to get rid of queue blocker

                                                      clientMessage.acknowledge();

                         

                                                      continue;

                                                 }

                         

                         

                                                 // add an entry into the transmission log to keep track of low level id and message id

                                                 MessageQueueOutbox.addTransmissionLog(ltv.getLowLevelId(), clientMessage.getMessageID());

                                                 // pass the LTV to the IO layer for transportation

                                                 ioSession.write(ltv);

                         

                         

                                                 // ATTENTION: Do not acknowledge now since we need to wait for client's low level ack

                                                 // clientMessage.acknowledge();

                                                 messageQueueServer.getSession().rollback();

                         

                         

                                                 LogMF.trace(log, "Dispatched message {0} for {1} to client {2}.", new Object[] { ltv, uid, ioSession.getRemoteAddress() });

                                      }

                                        else

                                       {

                                            LogMF.debug(log, "Client with {0} currently disconnected.", new Object[] { uid });

                                            // don't acknowledge the message so we will get it again

                                            // FIXME: invite the client to establish a connection via SIP server

                                            // Set redelivery timeout of message to a serious value so we don't bother the client

                                            // with subsequent invitations.

                                            messageQueueServer.getSession().rollback();

                         

                                            break;

                                       }

                                }

                        }

                        • 9. Re: HornetQ as message buffer
                          Sebastian Götz Newbie

                          Update:

                           

                          I now chose another approach. I still won't ack the messages after they are given to the io-layer. So I guess they remain in the queue and will be redelivered as I configured it. That's ok because if the device does not sent an acknowledgment in time, I have to resent the message.

                          When the device sents an acknowledge it is passed into the inbox workers pool and there I remove the message from the outbox queue by id. I can do that because in our protocol we have a message id to and I use the hornetq generated message id as message id for our protocol as well.

                          Sounds like an easy straight forward solution but I face a (maybe stupid) problem.

                          I use the management api to remove the message like that:

                           

                           

                          try
                          {
                                    // remove queued message from appropriate queue
                                    String queueName = ResourceNames.CORE_QUEUE + MessageQueueClient.uidToQueueName(getUid(), QueueType.OUTBOX);
                                    ClientSession session = getMessageQueueClient().getSession();
                                    ClientRequestor requestor = new ClientRequestor(session, MessageQueueClient.MANAGEMENT_QUEUE);
                                    ClientMessage message = session.createMessage(false);
                                    ClientMessage reply;
                          
                          
                                    ManagementHelper.putOperationInvocation(message, queueName, "removeMessage", getLTVMessage().getLowLevelId());
                                    reply = requestor.request(message, 1000);
                          
                          
                                    if ((reply == null) || !ManagementHelper.hasOperationSucceeded(reply))
                                    {
                                              LogMF.warn(log, "Failed to delete message {0} from queue {1}.", getLTVMessage().getLowLevelId(), queueName);
                                              throw new NestedException(ErrorCode.ICP_MQ_ACKNOWLEDGE);
                                    }
                          
                          
                                    LogMF.debug(log, "Successfully removed message {0} from queue {1}.", getLTVMessage().getLowLevelId(), queueName);
                          }
                          catch (Exception e)
                          {
                                    log.error(e.getMessage(), e);
                          
                          
                                    if (e instanceof NestedException)
                                    {
                                              throw e;
                                    }
                          
                          
                                    throw new NestedException(ErrorCode.ICP_PROCESSING, e);
                          }
                          

                           

                          The logging says that the message was successfully removed:

                           

                          DEBUG 2012-08-21 14:27:11,809 [pool-15-thread-1][] eu.inform.comm.prot.ltv.processor.ResponseProcessorImpl - Successfully removed message 2842 from queue core.queue.eu.inform.mq.ltvOutbox.66393035326434342D336662362D343564662D393633612D623230633235626636383332.
                          

                           

                          But when I consult the jconsole tool it says that there is still one message in the queue:

                          Unbenannt.PNG

                          And in my application I can also see that this message is there...

                          • 10. Re: HornetQ as message buffer
                            Sebastian Götz Newbie

                            Update:

                             

                            I found topic https://community.jboss.org/thread/166427

                            Seems that this was the same problem. But after changing the window-size the consumer still does not see the change in the queue...

                            • 11. Re: HornetQ as message buffer
                              Sebastian Götz Newbie

                              I finally made some work around this. Since I could not get it to work...