-
1. Re: when send msg to HornetQ server,how to show these msg
gaohoward Feb 6, 2012 1:09 AM (in response to kelly.jiang)How did you restart B? did you kill the process without closing the resources (session, connection, etc)?
Howard
-
2. Re: when send msg to HornetQ server,how to show these msg
kelly.jiang Feb 6, 2012 3:08 AM (in response to gaohoward)////////////////////////////// unregister brief process //////////////////////////////
before B exit process,i call unregister {
this.serverConfig.clear();
this.msgReceiver.close();
this.msgSender.close();
ClientSession.deleteQueue(this.queueName); //in queue ,have 5 receiver
ClientSession.close();
ClientSessionFactory.close();
ServerLocator.close();
}
////////////////////////////// register brief process //////////////////////////////
final static TopicHornetQMsgAttachPoint msgAttachPoint = new TopicHornetQMsgAttachPoint();
public void Init()
{
entry.hostName="xx"; //these brief process is rigth
// entry.name = "bb";
entry.name = "cc";
entry.port = 5445;
entry.msgCallBack = dataCenterMsgCallBack;
entry.receiveContext = this.receiveContext;msgAttachPoint.register(entry);
}////////////////////////////// start and exit B register/unregister brief process //////////////////////////////
main: first ,start this java programme,timeout,B exit. then start this B java programme, B receive msg slowly.
After exit B,resource is released,maybe the unregister function is wrong?
public static void main(String[] args) {
Init(); //register process and brief code is right
try
{
while (true)
{
Thread.sleep(50000); //after timeout, unregister is called.//when this B programme restart again . during this time,A send msg , B receive slowly
msgAttachPoint.unregister(); //unregister process
break;
//System.out.println("DataAccess.Start end");
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
};////////////////////////////// register/unregister brief process //////////////////////////////
/*
package com.opmextech.rfodn.msgexport;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSession.QueueQuery;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;import com.opmextech.rfodn.common.ApplicationContext;
import com.opmextech.rfodn.common.Global;
import com.opmextech.rfodn.common.MsgAttachPoint;
import com.opmextech.rfodn.common.RegisterEntry;import java.util.Calendar;
import java.util.GregorianCalendar;
public class HornetQMsgAttachPoint implements MsgAttachPoint {
protected transient RegisterEntry registerEntry = null;
protected transient ClientSession coreSession = null;
protected transient ClientProducer msgSender = null;
protected transient ClientConsumer msgReceiver = null;
protected transient String queueBindingAddress = null;
protected transient String queueName = null;
protected transient ClientMessage sendBuffer = null;
protected transient ClientSessionFactory sessionFactory = null;
protected transient InetAddress localIpAddr = null;
protected transient ServerLocator serverLocator = null;
protected transient HashMap<String, Object> serverConfig = null;
protected transient HornetQMsgCallBackHandler msgCallBackHandler = null;
protected transient HornetQSendAcknowledgementHandler msgAckCallBackHandler = null;
Calendar date = Calendar.getInstance();
public boolean register(final RegisterEntry paramRegisterEntry)
{
if ( (this.coreSession != null) || (paramRegisterEntry == null) || (paramRegisterEntry.name.isEmpty()))
{
return false;
}/**
@brief 备份注册信息,考虑是否需要Clone一份
*/
this.registerEntry = paramRegisterEntry;
try
{
this.queueBindingAddress = this.formatQueueAddress(paramRegisterEntry.name);
this.queueName = this.formatQueueName(paramRegisterEntry.name);
/**
*@brief 获取服务器参数,可以重写此函数,支持服务器的自动发现
*/
this.getConnectionParam();
this.createConnect();
this.createSession();
this.createQueue(this.queueBindingAddress,this.queueName,this.registerEntry.durable);
this.createSender();
/**
*@brief 开始接收数据
*1.The session must be started before ClientConsumers created by the session can consume messages from the queue
*/
if ( this.coreSession != null)
{
this.coreSession.start();
}
this.createReceiver(this.queueName);
if ( this.coreSession != null)
{
return true;
}
}
catch (Exception e)
{
this.unregister();
// TODO Auto-generated catch block
e.printStackTrace();
}
return false;
}public boolean sendMsg(final ApplicationContext sendContext)
{
if ( this.msgSender == null || this.coreSession == null || sendContext == null || sendContext.targetQueueAddress == null || sendContext.length <= 0)
{
return false;
}
if ( this.sendBuffer == null)
{
this.sendBuffer = this.coreSession.createMessage(false);
}
try {
this.sendBuffer.getBodyBuffer().clear();
HornetQApplicationContext context = (HornetQApplicationContext)sendContext;
if (context.qBuffer == null )
return false;
this.sendBuffer.getBodyBuffer().writeBytes(context.qBuffer, 0, context.qBuffer.readableBytes());
//this.sendBuffer.getBodyBuffer().writeBytes(sendContext.buffer, 0, sendContext.length);
this.sendBuffer.setTimestamp(System.currentTimeMillis());
/**
* @brief 发送消息时,可以加入原地址<br>
*/
if ( sendContext.sourceQueueAddress != null)
{
this.sendBuffer.putStringProperty(Message.HDR_ORIGINAL_ADDRESS, SimpleString.toSimpleString(sendContext.sourceQueueAddress));
}this.msgSender.send(sendContext.targetQueueAddress, sendBuffer);
} catch (HornetQException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return true;
}public void unregister()
{
try {
/**
* @brief 内存回收不了
*/
if ( this.msgAckCallBackHandler != null)
{
this.msgAckCallBackHandler = null;
}
if ( this.msgCallBackHandler != null)
{
this.msgCallBackHandler = null;
}
if ( this.serverConfig != null)
{
this.serverConfig.clear();
this.serverConfig = null;
}
if ( this.sendBuffer != null)
{
this.sendBuffer = null;
}
if ( msgReceiver != null)
{
this.msgReceiver.setMessageHandler(null);
this.msgReceiver.close();
this.msgReceiver = null;
}
if ( this.msgSender != null)
{
this.msgSender.close();
this.msgSender = null;
}
if (this.coreSession != null)
{
/**
@brief 如果创建的时临时队列,则要删除,如果是永久队列,则不需要<br>
*/
if ( this.registerEntry.durable == false)
{
this.coreSession.deleteQueue(this.queueName);
}
this.coreSession.setSendAcknowledgementHandler(null);
this.coreSession.close();
this.coreSession = null;
}
if (this.sessionFactory != null)
{
this.sessionFactory.close();
this.sessionFactory = null;
}
if ( this.serverLocator != null)
{
this.serverLocator.close();
this.serverLocator = null;
}
} catch (HornetQException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}protected void getConnectionParam()
{
if ( this.serverConfig == null)
{
serverConfig = new HashMap<String, Object>();
}
serverConfig.put("host", this.registerEntry.hostName);
serverConfig.put("port", this.registerEntry.port);
}protected void createConnect()
{
if ( this.serverLocator != null)
{
return;
}
try {
/**
* @brief TransportConfiguration需增加一个设置接口<br>
*/
final TransportConfiguration configuration = new TransportConfiguration(NettyConnectorFactory.class.getName(),serverConfig);
serverLocator = HornetQClient.createServerLocatorWithoutHA(configuration);
/**
* @brief 19.1.1基于窗口的流控制
*/
serverLocator.setConsumerWindowSize(Global.MAXCONSUMERWINDOWSIZE);
serverLocator.setConsumerMaxRate(Global.MAXCONSUMERRATE);
/**
* @brief 支持消息的异步发送<br>
* 1.ServerLocator可以设置ConfirmationWindowSize<br>
* 2.参考[hornetQ]手册20.4<br>
*/
serverLocator.setConfirmationWindowSize(Global.MAXCONFIRMATIONWINDOWSIZE);
this.serverLocator.setInitialConnectAttempts(Global.MAXRETRYCOUNT);
sessionFactory = serverLocator.createSessionFactory();
} catch (Exception e) {
this.unregister();
// TODO Auto-generated catch block
e.printStackTrace();
}
}protected void createSession()
{
if ((this.sessionFactory == null) ||( this.coreSession != null))
{
return;
}
try {
coreSession = sessionFactory.createSession(true,true);
} catch (HornetQException e) {
this.unregister();
// TODO Auto-generated catch block
e.printStackTrace();
}
}protected void createQueue(String queueAddr,String strQueueName,boolean durable)
{
if ( this.coreSession == null || queueAddr == null || strQueueName == null)
{
return;
}
try {
/**
* @brief SimpleString.toSimpleString会造成内存泄漏
* @bug: <br>
* 1.创建queue时用String,而查询用SimpleString,是什么原因<br>
*/
final QueueQuery query = coreSession.queueQuery(SimpleString.toSimpleString(strQueueName));
if ( query.isExists() == false )
{
/**
*@brief 如果队列不存在,则创建队列<br>
*/
coreSession.createQueue(queueAddr,strQueueName,durable);
}
} catch (HornetQException e) {
this.unregister();
// TODO Auto-generated catch block
e.printStackTrace();
}
}protected void createSender()
{
if (( this.coreSession == null) || ( this.msgSender != null))
{
return;
}
/**
* @brief 支持消息的异步发送,注册消息ack的异步通知callback<br>
*/
if ( this.msgAckCallBackHandler == null)
{
this.msgAckCallBackHandler = new HornetQSendAcknowledgementHandler(this.registerEntry.msgCallBack,registerEntry.receiveAckContext);
}
this.coreSession.setSendAcknowledgementHandler(this.msgAckCallBackHandler);try {
msgSender = coreSession.createProducer(this.queueBindingAddress);
} catch (HornetQException e) {
this.unregister();
// TODO Auto-generated catch block
e.printStackTrace();
}
}protected void createReceiver(String strQueueName)
{
if (( this.coreSession == null) ||( this.msgReceiver != null) || strQueueName == null)
{
return;
}
try {
/**
* @brief 消息的队列模式参考4.2.1
* 1.允许一个队列有多个接收者。但是一个消息最多只传送给一个接收者。<br>
* 2.队列名和地址是不同的,地址和队列名是1:N的关系<br>
* 3.参考 8.2.1 地址<br>
* 4.在内核中,没有topic的概念,只有地址address和queue,<br>
* 5.topic的功能:只要将一个地址绑定到多个queue即可。其中的每一个queue就相当于一个订阅subscription.<br>
*/
msgReceiver = coreSession.createConsumer(strQueueName);
/**
* @bug 加了这个,可能收不到离线消息
* 1.加了下面两条语句,client似乎收不到重复的消息了<br>
* 2.另外createConsumer中有个参数用于表示是否从队列中移除消息,是否也可以起到类似的效果<br>
*/
msgReceiver.setMessageHandler(null);
msgReceiver.receiveImmediate();
/**
* @brief 注册消息的异步接收callback函数<br>
*/
if ( this.msgCallBackHandler == null)
{
this.msgCallBackHandler = new HornetQMsgCallBackHandler(registerEntry.msgCallBack,registerEntry.receiveContext);
}
msgReceiver.setMessageHandler(this.msgCallBackHandler);
} catch (HornetQException e) {
this.unregister();
// TODO Auto-generated catch block
e.printStackTrace();
}
}public boolean queryQueue(final String strQueueName)
{
if (this.coreSession == null || strQueueName == null)
{
return false;
}
try {
/**
* @brief SimpleString.toSimpleString会造成内存泄漏
*/
final QueueQuery query = coreSession.queueQuery(SimpleString.toSimpleString(strQueueName));
if ( query.isExists() == true )
{
return true;
}
} catch (HornetQException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return false;
}public void deleteQueue(final String strQueueName)
{
if ( this.coreSession == null || strQueueName == null)
{
return;
}
try {
this.coreSession.deleteQueue(strQueueName);
} catch (HornetQException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}protected String formatQueueAddress(final String name)
{
if ( this.localIpAddr == null)
{
try {
localIpAddr = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return (Global.QUEUEPREFIX + name+":"+getLocalIpAddress().toString()+":"+this.hashCode());
}
public String getQueueAddress()
{
return this.queueBindingAddress;
}
public String formatQueueName(final String name)
{
return formatQueueAddress(name);
}
public String getQueueName()
{
return this.queueName;
}
public InetAddress getLocalIpAddress()
{
return this.localIpAddr;
}
}
-
3. Re: when send msg to HornetQ server,how to show these msg
kelly.jiang Feb 6, 2012 3:21 AM (in response to kelly.jiang)when unregister ,have follow output
HornetQException[errorCode=104 message=Cannot delete queue opmex.datacenter:KELLY-JIANG/192.168.188.27 on binding opmex.datacenter:KELLY-JIANG/192.168.188.27 - it has consumers = org.hornetq.core.postoffice.impl.LocalQueueBinding]
at org.hornetq.core.protocol.core.impl.ChannelImpl.sendBlocking(ChannelImpl.java:286)
at org.hornetq.core.client.impl.ClientSessionImpl.deleteQueue(ClientSessionImpl.java:350)
at org.hornetq.core.client.impl.ClientSessionImpl.deleteQueue(ClientSessionImpl.java:355)
at org.hornetq.core.client.impl.DelegatingSession.deleteQueue(DelegatingSession.java:327)interpret:in queue ,have 5 client(as receiver to receive what other client sent msg)
-
4. Re: when send msg to HornetQ server,how to show these msg
kelly.jiang Feb 6, 2012 4:16 AM (in response to kelly.jiang)after B process exit,then A send msg,then B start
this time ,B receive two msg,the previous and this
-
5. Re: when send msg to HornetQ server,how to show these msg
gaohoward Feb 7, 2012 12:21 AM (in response to kelly.jiang)Can I ask why you delete the queue in unregister process?
-
6. Re: when send msg to HornetQ server,how to show these msg
kelly.jiang Feb 7, 2012 3:11 AM (in response to gaohoward)i think, durable == false ,msg will not be saved in the queue,so delete it
public void unregister() {
if ( this.registerEntry.durable == false)
{
this.coreSession.deleteQueue(this.queueName);
}}
last letter,I maybe resolve the problem:
when exit the programme,i release the resource correctly,but the time maybe not right.
first,i don't release the resource rightly when exit my programme.then i modify my programme,and then continuely test the msg receive/send.this time ,the resource before created have not delete.today,i test again,and restart HornetQ server.and exit programme with release the resource corrcetly.the problem was resolved.
Thank you for your point that 'the resource not delete' to help me find the bug
-
7. Re: when send msg to HornetQ server,how to show these msg
gaohoward Feb 7, 2012 3:27 AM (in response to kelly.jiang)You are welcome. Glad you solved your problem.