7 Replies Latest reply on Feb 7, 2012 3:27 AM by gaohoward

    when send msg to HornetQ server,how to show these msg

    kelly.jiang

      now we're using HornetQ 2.2.5.Final

       

      have some problems:

         use hornetq core mode,create two client,in different pc,one send msg (A),and the other (B) receive it

      first,start A B,  and A send msg,then B receive it immediately.  But B restarted,A send one msg,maybe one minute later,B

      receive it. I don't know ,why B receive it so slowly in this senanio

       

      the i  see the msg log:received from A,and send to B.how and where  to show these msg?

      I check the logs directory /journal directory,no these msg

      journal directory data file ,how to open ?

       

      thinks in advance!

        • 1. Re: when send msg to HornetQ server,how to show these msg
          gaohoward

          How did you restart B? did you kill the process without closing the resources (session, connection, etc)?

           

          Howard

          • 2. Re: when send msg to HornetQ server,how to show these msg
            kelly.jiang

            //////////////////////////////  unregister brief process  //////////////////////////////

             

            before B exit process,i call unregister {

            this.serverConfig.clear();

            this.msgReceiver.close();

            this.msgSender.close();

            ClientSession.deleteQueue(this.queueName);  //in queue ,have 5 receiver

            ClientSession.close();

            ClientSessionFactory.close();

            ServerLocator.close();

            }

             

             

            ////////////////////////////// register  brief process //////////////////////////////

            final static TopicHornetQMsgAttachPoint      msgAttachPoint = new TopicHornetQMsgAttachPoint();

            public void Init()
            {   
            entry.hostName="xx";  //these brief process is rigth
            // entry.name = "bb";

            entry.name = "cc";

            entry.port = 5445;
            entry.msgCallBack = dataCenterMsgCallBack;
            entry.receiveContext = this.receiveContext;

            msgAttachPoint.register(entry);
            }

             

             

            ////////////////////////////// start and exit B register/unregister  brief process //////////////////////////////

            main: first ,start this java programme,timeout,B exit.   then start this B java programme, B receive msg slowly.

            After exit B,resource is released,maybe the unregister function is wrong?

             

            public static void main(String[] args) {
             
                Init();   //register process   and brief code is right
              
              
               try
                {
                while (true)
                 {  
                 Thread.sleep(50000);     //after timeout, unregister is called.

                                                       //when this B programme restart again .   during this time,A send msg , B receive slowly

                 
                msgAttachPoint.unregister();      //unregister process
             
                 break;
                 //System.out.println("DataAccess.Start end");
                 }
               
               
                }
               catch (InterruptedException e)
                {
              e.printStackTrace();
                }
              };

             

             

            ////////////////////////////// register/unregister  brief  process //////////////////////////////

            /*

            package com.opmextech.rfodn.msgexport;

            import java.net.InetAddress;
            import java.net.UnknownHostException;
            import java.util.HashMap;
            import org.hornetq.api.core.HornetQException;
            import org.hornetq.api.core.Message;
            import org.hornetq.api.core.SimpleString;
            import org.hornetq.api.core.TransportConfiguration;
            import org.hornetq.api.core.client.ClientConsumer;
            import org.hornetq.api.core.client.ClientMessage;
            import org.hornetq.api.core.client.ClientProducer;
            import org.hornetq.api.core.client.ClientSession;
            import org.hornetq.api.core.client.ClientSession.QueueQuery;
            import org.hornetq.api.core.client.ClientSessionFactory;
            import org.hornetq.api.core.client.HornetQClient;
            import org.hornetq.api.core.client.ServerLocator;
            import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;

            import com.opmextech.rfodn.common.ApplicationContext;
            import com.opmextech.rfodn.common.Global;
            import com.opmextech.rfodn.common.MsgAttachPoint;
            import com.opmextech.rfodn.common.RegisterEntry;

            import java.util.Calendar;
            import java.util.GregorianCalendar;


            public class HornetQMsgAttachPoint implements MsgAttachPoint {
             
            protected transient RegisterEntry   registerEntry    = null;
            protected transient ClientSession   coreSession    = null;
            protected transient ClientProducer  msgSender     = null;
            protected transient ClientConsumer  msgReceiver          = null;
            protected transient String          queueBindingAddress     = null;
            protected transient String          queueName               = null;
            protected transient ClientMessage   sendBuffer           = null; 
            protected transient ClientSessionFactory   sessionFactory   = null;
            protected transient InetAddress     localIpAddr             = null;
            protected transient ServerLocator   serverLocator           = null;
            protected transient HashMap<String, Object> serverConfig    = null;
            protected transient HornetQMsgCallBackHandler          msgCallBackHandler    = null;
            protected transient HornetQSendAcknowledgementHandler  msgAckCallBackHandler = null;


            Calendar date = Calendar.getInstance();

            public boolean register(final RegisterEntry paramRegisterEntry) 
              {
              if ( (this.coreSession != null) || (paramRegisterEntry == null) || (paramRegisterEntry.name.isEmpty()))
               {
               return false;  
               }

              /**
              @brief 备份注册信息,考虑是否需要Clone一份 
              */
              this.registerEntry = paramRegisterEntry;
             
              try
               {  
               this.queueBindingAddress = this.formatQueueAddress(paramRegisterEntry.name);
               this.queueName        = this.formatQueueName(paramRegisterEntry.name);
               /**
               *@brief 获取服务器参数,可以重写此函数,支持服务器的自动发现
               */
               this.getConnectionParam();
               
               this.createConnect();  
               
               this.createSession();
                
               this.createQueue(this.queueBindingAddress,this.queueName,this.registerEntry.durable);
                          
               this.createSender();
              
               /**
               *@brief 开始接收数据
               *1.The session must be started before ClientConsumers created by the session can consume messages from the queue
               */  
               if ( this.coreSession != null)
               {
                this.coreSession.start();
               }
                 
               this.createReceiver(this.queueName);    
                    
               if ( this.coreSession != null)
                {
                return true;
                }
              
               }
              catch (Exception e)
               {
               this.unregister();   
               // TODO Auto-generated catch block
               e.printStackTrace(); 
               } 
              return false;
              }

            public boolean sendMsg(final ApplicationContext sendContext)
            {
              if ( this.msgSender == null || this.coreSession == null || sendContext == null || sendContext.targetQueueAddress == null || sendContext.length <= 0)
               {
               return false;
               }
             
              if ( this.sendBuffer == null)
               {
               this.sendBuffer = this.coreSession.createMessage(false);
               }
                   
              try {  
               this.sendBuffer.getBodyBuffer().clear();
              
               HornetQApplicationContext context = (HornetQApplicationContext)sendContext;
              
               if (context.qBuffer == null )
                return false;
              
               this.sendBuffer.getBodyBuffer().writeBytes(context.qBuffer, 0, context.qBuffer.readableBytes());
              
               //this.sendBuffer.getBodyBuffer().writeBytes(sendContext.buffer, 0, sendContext.length);
              
               this.sendBuffer.setTimestamp(System.currentTimeMillis());
              
               /**
                * @brief 发送消息时,可以加入原地址<br>
                */
               if ( sendContext.sourceQueueAddress != null)
                   {
                this.sendBuffer.putStringProperty(Message.HDR_ORIGINAL_ADDRESS, SimpleString.toSimpleString(sendContext.sourceQueueAddress));       
                   }

               this.msgSender.send(sendContext.targetQueueAddress, sendBuffer);
               
              } catch (HornetQException e) {
               // TODO Auto-generated catch block
               e.printStackTrace();
              } 
              return true; 
            }

            public void unregister()

            try {
              /**
               * @brief 内存回收不了
               */
              if ( this.msgAckCallBackHandler != null)
               {
               this.msgAckCallBackHandler = null;
               }
              if ( this.msgCallBackHandler != null)
               {
               this.msgCallBackHandler = null;
               }
              if ( this.serverConfig != null)
               {
               this.serverConfig.clear();
               this.serverConfig = null;
               }
              if ( this.sendBuffer != null)
               {  
               this.sendBuffer = null;
               } 
              if ( msgReceiver != null)
               {
               this.msgReceiver.setMessageHandler(null);
              
               this.msgReceiver.close();
              
               this.msgReceiver = null;
               }
             
              if ( this.msgSender != null)
               {
               this.msgSender.close();
              
               this.msgSender = null;
               }

              if (this.coreSession != null)
               {  
               /**
               @brief 如果创建的时临时队列,则要删除,如果是永久队列,则不需要<br>
               */
               if ( this.registerEntry.durable == false)
                {
                this.coreSession.deleteQueue(this.queueName);  
                }
              
               this.coreSession.setSendAcknowledgementHandler(null);
              
               this.coreSession.close();
              
               this.coreSession = null;
               }
             
              if (this.sessionFactory != null)
               {
               this.sessionFactory.close();
              
               this.sessionFactory = null;
               }   
              if ( this.serverLocator != null)
               {
               this.serverLocator.close();
              
               this.serverLocator = null;
               }
             
            } catch (HornetQException e)
              {
              // TODO Auto-generated catch block
              e.printStackTrace();
              }
            }

            protected void getConnectionParam()

              if ( this.serverConfig == null)
               {
               serverConfig    = new HashMap<String, Object>();
               }    
             
              serverConfig.put("host", this.registerEntry.hostName);
             
              serverConfig.put("port", this.registerEntry.port);
            }

            protected void createConnect()

              if ( this.serverLocator != null)
               {
               return;
               }
              try {
               /**
                * @brief TransportConfiguration需增加一个设置接口<br>
                */
               final TransportConfiguration configuration = new TransportConfiguration(NettyConnectorFactory.class.getName(),serverConfig);
              
               serverLocator = HornetQClient.createServerLocatorWithoutHA(configuration);
               /**
                * @brief 19.1.1基于窗口的流控制
                */
               serverLocator.setConsumerWindowSize(Global.MAXCONSUMERWINDOWSIZE);
               serverLocator.setConsumerMaxRate(Global.MAXCONSUMERRATE);
               /**
                * @brief 支持消息的异步发送<br>
                * 1.ServerLocator可以设置ConfirmationWindowSize<br>
                * 2.参考[hornetQ]手册20.4<br>
                */
               serverLocator.setConfirmationWindowSize(Global.MAXCONFIRMATIONWINDOWSIZE);
                
               this.serverLocator.setInitialConnectAttempts(Global.MAXRETRYCOUNT); 
                  
               sessionFactory = serverLocator.createSessionFactory(); 
             
              } catch (Exception e) {
              
               this.unregister();
              
               // TODO Auto-generated catch block
               e.printStackTrace();
              }  
            }

            protected void createSession()
            {
              if ((this.sessionFactory == null) ||( this.coreSession != null))
               {
               return;
               }
              try {
               coreSession = sessionFactory.createSession(true,true);
              } catch (HornetQException e) {
              
               this.unregister();  
               // TODO Auto-generated catch block
               e.printStackTrace();
              }
            }

            protected void createQueue(String queueAddr,String strQueueName,boolean durable)

              if ( this.coreSession == null || queueAddr == null || strQueueName == null)
               {
               return;  
               }
              try {
               /**
                * @brief SimpleString.toSimpleString会造成内存泄漏
                * @bug: <br>
                * 1.创建queue时用String,而查询用SimpleString,是什么原因<br>
                */
               final QueueQuery query = coreSession.queueQuery(SimpleString.toSimpleString(strQueueName));
               if ( query.isExists() == false )
                {
                /**
                 *@brief 如果队列不存在,则创建队列<br>   
                 */    
                coreSession.createQueue(queueAddr,strQueueName,durable);
               
                }
              } catch (HornetQException e) {
               this.unregister();
               // TODO Auto-generated catch block
               e.printStackTrace();
              } 
            }

            protected void createSender()

                if (( this.coreSession == null) || ( this.msgSender != null))
               {
               return;
               }
              /**
               * @brief 支持消息的异步发送,注册消息ack的异步通知callback<br>
               */
              if ( this.msgAckCallBackHandler == null)
               {
               this.msgAckCallBackHandler = new HornetQSendAcknowledgementHandler(this.registerEntry.msgCallBack,registerEntry.receiveAckContext);
               }
              this.coreSession.setSendAcknowledgementHandler(this.msgAckCallBackHandler);     

              try {
               msgSender   =  coreSession.createProducer(this.queueBindingAddress);
              
              } catch (HornetQException e) {
              
               this.unregister();
               // TODO Auto-generated catch block
               e.printStackTrace();
              }
            }

            protected void createReceiver(String strQueueName)
            {
              if (( this.coreSession == null) ||( this.msgReceiver != null) || strQueueName == null)
               {
               return;  
               }
              try {
               /**
                * @brief  消息的队列模式参考4.2.1
                * 1.允许一个队列有多个接收者。但是一个消息最多只传送给一个接收者。<br>
                * 2.队列名和地址是不同的,地址和队列名是1:N的关系<br>
                * 3.参考 8.2.1 地址<br>
                         * 4.在内核中,没有topic的概念,只有地址address和queue,<br>
                         * 5.topic的功能:只要将一个地址绑定到多个queue即可。其中的每一个queue就相当于一个订阅subscription.<br>
                 */
               msgReceiver =  coreSession.createConsumer(strQueueName);
              
               /**
                * @bug 加了这个,可能收不到离线消息
                * 1.加了下面两条语句,client似乎收不到重复的消息了<br>
                * 2.另外createConsumer中有个参数用于表示是否从队列中移除消息,是否也可以起到类似的效果<br>
                */
               msgReceiver.setMessageHandler(null);
               msgReceiver.receiveImmediate();
              
               /**
                * @brief 注册消息的异步接收callback函数<br>
                */
               if ( this.msgCallBackHandler == null)
                {
                this.msgCallBackHandler = new HornetQMsgCallBackHandler(registerEntry.msgCallBack,registerEntry.receiveContext);
                }
               msgReceiver.setMessageHandler(this.msgCallBackHandler);
              
              } catch (HornetQException e) {
              
               this.unregister();
              
               // TODO Auto-generated catch block
               e.printStackTrace();
              } 
            }

            public boolean queryQueue(final String strQueueName)

              if (this.coreSession == null || strQueueName == null)
               {
               return false;
               }
              try {
               /**
                * @brief SimpleString.toSimpleString会造成内存泄漏
                */
               final QueueQuery query = coreSession.queueQuery(SimpleString.toSimpleString(strQueueName));
               if ( query.isExists() == true )
                {
                return true;
                }
              } catch (HornetQException e) {  
               // TODO Auto-generated catch block
               e.printStackTrace();
              }
              return false;
            }

            public void deleteQueue(final String strQueueName)
            {
              if ( this.coreSession == null || strQueueName == null)
               {
               return;
               }
              try {
               this.coreSession.deleteQueue(strQueueName);
               } catch (HornetQException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
               } 

            protected String formatQueueAddress(final String name)
            {  
              if ( this.localIpAddr == null)
               {
               try {
                localIpAddr = InetAddress.getLocalHost();
               } catch (UnknownHostException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
               }
               }
              return (Global.QUEUEPREFIX + name+":"+getLocalIpAddress().toString()+":"+this.hashCode()); 
            }

            public String getQueueAddress()
            {
                return this.queueBindingAddress;
            }

            public String formatQueueName(final String name)
            {
              return formatQueueAddress(name);
            }

            public String getQueueName()
            {
              return this.queueName;
            }

            public InetAddress getLocalIpAddress()
              {
                 return this.localIpAddr;
              } 
            }

            • 3. Re: when send msg to HornetQ server,how to show these msg
              kelly.jiang

              when unregister ,have follow output

               

              HornetQException[errorCode=104 message=Cannot delete queue opmex.datacenter:KELLY-JIANG/192.168.188.27 on binding opmex.datacenter:KELLY-JIANG/192.168.188.27 - it has consumers = org.hornetq.core.postoffice.impl.LocalQueueBinding]
              at org.hornetq.core.protocol.core.impl.ChannelImpl.sendBlocking(ChannelImpl.java:286)
              at org.hornetq.core.client.impl.ClientSessionImpl.deleteQueue(ClientSessionImpl.java:350)
              at org.hornetq.core.client.impl.ClientSessionImpl.deleteQueue(ClientSessionImpl.java:355)
              at org.hornetq.core.client.impl.DelegatingSession.deleteQueue(DelegatingSession.java:327)

               

               

              interpret:in queue ,have 5 client(as receiver to receive what other client sent msg)

              • 4. Re: when send msg to HornetQ server,how to show these msg
                kelly.jiang

                after B process exit,then A send msg,then B start

                this time ,B receive two msg,the previous and this

                • 5. Re: when send msg to HornetQ server,how to show these msg
                  gaohoward

                  Can I ask why you delete the queue in unregister process?

                  • 6. Re: when send msg to HornetQ server,how to show these msg
                    kelly.jiang

                    i think, durable == false ,msg will not be saved in the queue,so delete it

                     

                    public void unregister() {                       
                        if ( this.registerEntry.durable == false)
                        {
                            this.coreSession.deleteQueue(this.queueName);                       
                        }

                     

                    }

                     

                     

                    last letter,I maybe resolve the problem:
                    when exit the programme,i  release the resource correctly,but the time maybe not right.
                    first,i don't release the resource rightly when exit my programme.then i modify my programme,and then continuely test the msg receive/send.this time ,the resource before created have not delete.

                     

                    today,i test again,and restart HornetQ server.and exit programme with release the resource corrcetly.the problem was resolved.

                     

                    Thank you for your point that 'the resource not delete' to help me find the bug

                    • 7. Re: when send msg to HornetQ server,how to show these msg
                      gaohoward

                      You are welcome. Glad you solved your problem.