2 Replies Latest reply on Mar 1, 2011 8:18 PM by bl5536

    Missing messages on queue using HornetQ and StompConnect

    bl5536

      Hello everyone,

      I have a server set up using HornetQ and is configured with StompConnect using the following config located here

       

      Now I have the following code that uses Apache NMS Stomp client with C# that subscribes to queues A and B with property ack:client  .

       

      Then I send a message to queue A and queue B. I receive queue A and ack the message but I am stuck waiting for message B because it is no longer in queue B. I check the jmx console of my server and see that a message has been added but it is no longer there. I also set the subscriber to ack:client and never acked the message.

       

      So now I have this message I sent to B that successfully got added but mysteriously dissapeared even though I never ACKed it . The weird thing is the message on A properly got the receipt back and then I ACKed it, however with B after I send it, I never got the receipt and it is no longer in thr queue.

       

      If anybody has any insight or advice it would be appreciated, Thanks.

        • 1. Missing messages on queue using HornetQ and StompConnect
          clebert.suconic

          - Try trunk first, if still fails...

           

          - provide a testcase

          • 2. Re: Missing messages on queue using HornetQ and StompConnect
            bl5536

            Here is a simple example using Apache NMS client under C#

             

             

                            IConnectionFactory factory = new ConnectionFactory(serverName);
                            //Create the connection 
                            IConnection connection = factory.CreateConnection();
                            connection.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
                            //Create the Session 
                            ISession session = connection.CreateSession();
                            using (IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.Stomp.Commands.Queue(primaryQueue), null))
                            using (IMessageProducer producer = session.CreateProducer(new Apache.NMS.Stomp.Commands.Queue(primaryQueue)))
                            {
                                // Start the connection so that messages will be processed.
                                connection.Start();
                                ITextMessage msg = producer.CreateTextMessage("Message");
                                producer.Send(msg, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.MinValue);
                                msg = producer.CreateTextMessage("Message2");
            
                                producer.Send(msg, MsgDeliveryMode.Persistent, MsgPriority.Normal, TimeSpan.MinValue);
                                Console.WriteLine("Sent 2 messages"); 
            
                                // Consume a message
                                Console.WriteLine("Receiving 1 ...");
                                ITextMessage message = consumer.Receive(new TimeSpan(1000)) as ITextMessage;
                                if (message.Text == "Message1")
                                {
                                    Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
                                    Console.WriteLine("Received message with text: " + message.Text);
                                    message.Acknowledge();
                                    Console.WriteLine("ACK " + message.Text);
                                }
                                Console.WriteLine("Receiving 2 ...");
                                message = consumer.Receive(new TimeSpan(1000)) as ITextMessage;
                                if (message.Text == "Message2")
                                {
                                    Console.WriteLine("Message2  received!");
                                    Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
                                    Console.WriteLine("Received message with text: " + message.Text);
                                    Console.WriteLine("No ACK");
                                }
                                producer.Close();
                                consumer.Close();
                            }
                            Console.WriteLine("Disconnect");
                            connection.Stop();
                            connection.Close();
            
                            Console.WriteLine("Attempt to reconnect and get Message2");
            
                            IConnectionFactory factory2 = new ConnectionFactory(serverName);
                            //Create the connection 
                            IConnection connection2 = factory2.CreateConnection();
                            connection2.AcknowledgementMode = AcknowledgementMode.ClientAcknowledge;
            
                            ISession session2 = connection2.CreateSession();
                            using (IMessageConsumer consumer2 = session2.CreateConsumer(new Apache.NMS.Stomp.Commands.Queue(primaryQueue), null))
                            {
                                connection2.Start();
                                ITextMessage message = consumer2.Receive() as ITextMessage;
                                if (message.Text == "Message2")
                                {
                                    Console.WriteLine("Message2 received!");
                                    Console.WriteLine("Received message with ID:   " + message.NMSMessageId);
                                    Console.WriteLine("Received message with text: " + message.Text);
                                    Console.WriteLine("No ACK");
                                }
                                consumer2.Close();
                            }
                            connection2.Close();
            

             

            There is a lot of trivial and repeated code, but I just wanted to write something quick and dirty.

            The following sends 2 messages to queue A, then I receive the message and ACK it , then I receive the second  and do NOT ACK.

            However the problem exists in trying to recieve the message after the disconnect and reconnect. I should be able to connect and receive the message however when I look in the jmx console, I can see the message counter incremented twice. However there are no more messages in the queue.

             

            Thanks for responding and was wondering if there is any other insight?