10 Replies Latest reply on Mar 9, 2010 1:02 PM by John Muhlestein

    QueueControl.moveMessages() to different address

    John Muhlestein Newbie

      I wanted to know if using the QueueControl.moveMessages(String filter, String otherQueue) method allows you to move messages between queues that are bound to different addresses?

       

      My tests seem to indicate that it doesn't, but I wanted to follow up, just in case what I'm seeing is incorrect.

       

      In an effort to try and keep HornetQ happy, and our performance as high as possible, I am attempting to implement a poor mans divert, but only diverting messages from our global topic when a certain durable subscription gets too many messages waiting to be consumed. We want to move the messages for that particular subscriber onto a fallback topic which has different configuration to accommodate it not being able to keep up while preserving the ability for all other subscriptions to continue to consume messages.

       

      It would be nice to not have to consume these messages and then immediately publish them onto the fallback topic right away.

       

      thanks,

      John

        • 1. Re: QueueControl.moveMessages() to different address
          Tim Fox Master

          johnnysoccer wrote:

           

          I wanted to know if using the QueueControl.moveMessages(String filter, String otherQueue) method allows you to move messages between queues that are bound to different addresses?

           

          My tests seem to indicate that it doesn't, but I wanted to follow up, just in case what I'm seeing is incorrect.


          Yes, this should certainly work

          • 2. Re: QueueControl.moveMessages() to different address
            John Muhlestein Newbie

            Follow up question then. It appears the behavior I'm seeing (not moving the messages) occurs when there is an active connection by a durable subscriber to the queue I want to move messages for. I am able to remove messages whene there is an active durable subscribtion, just not move them. Is this the expected behavior, to not be able to move messages between queues while there is an active connection on the queue you want to move the messages from?

             

            If so, this leads to another question. I don't seem to be able to find a way to identify which IP Address is associated with a given connection to a queue, and more specifically the ip address that is associated with that specific subscription on the queue?

             

            Finally, If I am able to identify the specific connetion (say in the form of the ip:port format similar to the one returned by the ServerControl.listRemoteAddresses()), I would like to kill that specific connection, and it seems that the ServerControl.closeConnectionsForAddress() call does only the more broad action of killing all the connections from a particular ip address.

             

            John

            • 3. Re: QueueControl.moveMessages() to different address
              Clebert Suconic Master

              Probably those messages are on the client buffer. As we do that for performance reasons. (You could disable the client buffer though but that would have performance implications).

               

               

              I guess you could add a feature request on listing consumers through management (and their IPs) associated to a queue if you like.

              • 4. Re: QueueControl.moveMessages() to different address
                Tim Fox Master

                I'm not convinced.

                 

                John - if you could demonstrate a test case which replicates the issue, we can take a look.

                • 5. Re: QueueControl.moveMessages() to different address
                  John Muhlestein Newbie
                  Seeing the connections for a consumer through the management API would be helpful.
                  • 6. Re: QueueControl.moveMessages() to different address
                    John Muhlestein Newbie

                    Here is an export from Eclipse.

                    Launcher.java is the class with the main method

                     

                    You would need 3 topics

                    Offload.out

                    Offload.out.store1

                    Offload.out.store2

                     

                    You can see the configuration in the src/main/conf/jmsconfig.properties

                     

                    This program publishes messages at a rate of 1 per second with a subscriber that consumes one message every 10 seconds and one subscriber that consumes every 2 seconds.  After a few iterations, I attempt to offload messages from one of the subscribers to a different queue.

                     

                    When I run this the messages show 0 messages moved.

                    If I do a remove messages, it removes all the messages in the queue

                    If I attempt to move messages via jconsole, no messages move (while the program is running)

                    Once the program stops, and I remove all connections from the address (or wait for the server to timeout the connections) I am able to move messages via jconsole.

                     

                    John

                    • 7. Re: QueueControl.moveMessages() to different address
                      Clebert Suconic Master

                      What John is saying is expected IMO.

                       

                      Say, you have a consumer with deliveringMessages on it.

                       

                      Then you call QueueControl.deleteMessages("");

                       

                       

                      The messages that are being delivered will not be deleted (as expected).

                       

                       

                      I wrote a simple test to demonstrate what John is saying. (what's is actually easier to understand than the 1 MiB package):

                       

                       

                      public class DeleteAllWithPendingTest extends ServiceTestBase
                      {
                      
                         
                         public void testDeleteAll() throws Exception
                         {
                            HornetQServer server = createServer(false);
                            server.start();
                            
                            ClientSessionFactory sf = createFactory(false);
                            ClientSession session = sf.createSession();
                            
                            session.createQueue("a", "a");
                            
                            ClientProducer prod = session.createProducer("a");
                            
                            
                            for (int i = 0 ; i < 1000; i++)
                            {
                               prod.send(session.createMessage(false));
                            }
                            
                            session.close();
                            
                            
                            session = sf.createSession();
                            
                            ClientConsumer consumer = session.createConsumer("a");
                            session.start();
                            
                            ClientMessage msg = consumer.receiveImmediate();
                            assertNotNull(msg);
                            msg.acknowledge();
                            
                            //consumer.close(); // if you close the consumer.. all messages are removed
                            
                            QueueControl control = (QueueControl)server.getManagementService().getResource("core.queue.a");
                            control.removeMessages("");
                            
                            session.close();
                            
                            
                            session = sf.createSession();
                            
                            consumer = session.createConsumer("a");
                            session.start();
                            
                            
                            assertNull(consumer.receiveImmediate()); // this will fail of course (as expected) as the messages couldn't be deleted
                            
                            
                            session.close();
                            
                            server.stop();
                            
                         }
                      
                       
                      

                       

                       

                      So, basically all that John wants (as fas as I understood) is a way to list the consumers and their IPs associated with a queue. (Core Queue)

                      1 of 1 people found this helpful
                      • 8. Re: QueueControl.moveMessages() to different address
                        Tim Fox Master

                        Well.. sure you can't delete messages if they've been delivered but not acked.

                         

                        Messages need to be in the queue for them to be moved.

                        1 of 1 people found this helpful
                        • 9. Re: QueueControl.moveMessages() to different address
                          Clebert Suconic Master

                          Yep.. that's what I was saying.

                           

                          I guess listing consumers/IP on a queue would be a valid feature request. And John could open a JIRA with the request if he likes it.