2 Replies Latest reply on Jul 15, 2010 10:29 PM by cinto_qq

    Why do we lose messages after reconnection?

    cinto_qq

      Hi,

       

      I found that after reconnection, I will lose some messages. I am supposed to send out 5000 messages, but at the end of the sending, the count was 4997.

       

      For reconnection, I am using the following code:

       

      HornetQConnectionFactory connFactory = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(transportConfiguration);                                
      connFactory.setReconnectAttempts(-1);
      connFactory.setRetryInterval(1000);
      connFactory.setRetryIntervalMultiplier(1);
                 
      connection = connFactory.createConnection();         

       

       

      Is there anything wrong with my coding? Or is it my server setting?

       

      Thanks.

       

       

      Cinto

        • 1. Re: Why do we lose messages after reconnection?
          ataylor

          Its hard to say, what ack mode are you using, how are you counting the messages, how is your server configured. Post some code and config and maybe we can help, also what version you are using would be helpful and make sure you try with trunk if you think it is a bug.

          • 2. Re: Why do we lose messages after reconnection?
            cinto_qq

            Hi Andy,

             

            I checked the message count from JConsole.

             

            Here's the code:

             

            class QueueProducer{

             

                public javax.jms.Connection connection;

                public Session session;

                public Destination destination;

                MessageProducer producer;

                static int count = 0;

                static int total = 0;

             

             

                public void initialize()

                {

                    try{

             

                        Map<String, Object> connectionParams = new HashMap<String, Object>();

                        connectionParams.put(org.hornetq.integration.transports.netty.TransportConstants.HOST_PROP_NAME, "servername");           

                        connectionParams.put(org.hornetq.integration.transports.netty.TransportConstants.PORT_PROP_NAME, 5445);

             

                        TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(),

                                connectionParams);           

             

                        HornetQConnectionFactory connFactory = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactory(transportConfiguration);                       

             

                        connFactory.setReconnectAttempts(-1);

                        connFactory.setRetryInterval(1000);

                        connFactory.setRetryIntervalMultiplier(1);

             

                        connection = connFactory.createConnection("user", "user");

             

                        connection.start();

             

                        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

                        destination = session.createQueue("doraemonqueue");

                        producer = session.createProducer(destination);

             

                    }

             

                    catch(Exception ex)

                    {      

                        System.out.println("error in initialize : " + ex.toString());      

                    }          

                }

             

             

                public void disconnect()

                {      

                     try{  

                         connection.close();  

                     }

                     catch(JMSException ex){

                         System.out.println("error in disconnect : " + ex.toString());

                     }  

                }

             

             

                public void sendMessage(String text)

                {  

                    try

                    {      

                       TextMessage message = session.createTextMessage(text);

             

                        producer.send(message);

             

                        total++;          

                        System.out.println("Message: " + total);

             

                    }

                    catch(JMSException ex){

                        System.out.println("error in sendMessage : " + ex.toString());

                    }

             

                }

             

             

                public static void main(String args[])

                {

             

                    QueueProducer queueprod = new QueueProducer();

                    queueprod.initialize();

             

                    try{

             

                        File file = new File("input.xml");

                        BufferedReader input = new BufferedReader(new FileReader(file));

                        String text = "";

                        String value = "";

             

                        while((text = input.readLine()) != null)

                        value += text;

             

                        input.close();

             

                        start = System.currentTimeMillis();

             

                        while(true && count >= 0 && count < 1000)

                        {

                            queueprod.sendMessage(value);

                            count++;

                        }          

                    }

                    catch(Exception e)

                    {

                        System.out.println("error in main " + e.toString());

                    }

             

                    queueprod.disconnect();

                    System.out.println("total : "+total);

                    System.out.println("success");

             

                }

             

            }

             

             

             

            Here are the config files:

             

            jms-configuration.xml

             

            <configuration xmlns="urn:hornetq"
                        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                        xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

             

               <!-- Connectors -->
               <connectors>
                  <connector name="netty-connector">
                     <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>        
                       <param key="host"  value="${hornetq.remoting.netty.host:dlwb07v}"/>
                     <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
                </connector>   
               </connectors>
              
               <paging-directory>../data/page</paging-directory>
               <bindings-directory>../data/bindings</bindings-directory>
               <journal-directory>../data/journal</journal-directory>
               <large-messages-directory>../data/large-messages</large-messages-directory>
              
               <!-- Acceptors -->
               <acceptors>
                  <acceptor name="netty-acceptor">
                     <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>        
                     <param key="host"  value="${hornetq.remoting.netty.host:dlwb07v}"/>
                     <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
                </acceptor>
               </acceptors>

             

               <!-- Other config -->

             

               <security-settings>
                  <!--security for example queue-->         
                  <security-setting match="jms.queue.doraemonqueue">
                     <permission type="createDurableQueue" roles="user"/>
                     <permission type="deleteDurableQueue" roles="admin"/>
                     <permission type="createTempQueue" roles="admin"/>
                     <permission type="deleteTempQueue" roles="admin"/>
                     <permission type="consume" roles="user1"/>
                     <permission type="send" roles="guest,user"/>
                  </security-setting>

             

               </security-settings>

             

               <address-settings>    
                  <address-setting match="#">
                     <max-size-bytes>104857600</max-size-bytes>   
                     <page-size-bytes>10485760</page-size-bytes>    
                     <address-full-policy>PAGE</address-full-policy>
                  </address-setting>
               </address-settings>

             

            </configuration>

             

            jms-beans.xml

             

            <configuration xmlns="urn:hornetq"
                        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                        xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
               <!--the connection factory used by the example-->
               <connection-factory name="ConnectionFactory">
                  <connectors>
                     <connector-ref connector-name="netty-connector"/>
                  </connectors>
                  <entries>
                     <entry name="ConnectionFactory"/>
                  </entries>

             

               </connection-factory>

             

               <!--the topic used by the example-->
              
               <queue name="doraemonqueue">
                  <entry name="/queue/doraemonqueue"/>
               </queue>

             

            </configuration>

             

             

            Do you need any other files? Please let me know.

             

            Thanks.

             

             

            Cinto