package com.gamebean.base.service.message.impl;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.locks.LockSupport;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueRequestor;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.management.JMSManagementHelper;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.server.config.ConnectionFactoryConfiguration;
import org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl;
import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
import org.hornetq.jms.server.embedded.EmbeddedJMS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.gamebean.base.service.ServiceManager;
import com.gamebean.base.service.message.EventListener;
import com.gamebean.base.service.message.MessageService;
import com.gamebean.base.service.message.spi.MessageServiceExt;
public class MessageServiceImpl implements MessageService,
MessageServiceImplMBean {
public static final String EMBEDDED_CONNECTION_FACTORY_JNDI_NAME = "jms:/local/embedded/connectionFactory";
public static MessageService getInstance() {
return instance;
}
private EmbeddedJMS ejms;
// private LinkedHashMap<Connection, EventListener> listeners = new
// LinkedHashMap<>();
private Connection eventConnection;
private boolean inited = false;
private ObjectName mbeanName;
// private boolean localService;
private Map<String, Object> readOnlyConfig;
private final static Logger logger = LoggerFactory
.getLogger(MessageServiceImpl.class);
private static MessageServiceImpl instance = new MessageServiceImpl();
public MessageServiceImpl() {
}
@Override
public Connection addEventListener(final EventListener listener,
String eventTopic) {
return addEventListener(listener, eventTopic, null);
}
@Override
public Connection addEventListener(final EventListener listener,
String eventTopic, String messageSelector) {
if (logger.isDebugEnabled()) {
logger.info("增加事件监听器:" + listener + ",Topic:" + eventTopic
+ ",selector:" + messageSelector);
}
try {
//
createDynamicTopic(eventTopic);
ConnectionFactory cf = this.getConnectionFactory();
//
Connection conn = cf.createConnection();
Session session = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(eventTopic);
MessageConsumer consumer = session.createConsumer(topic,
messageSelector);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
String topicOrQueue;
Destination jmsDestination = message
.getJMSDestination();
if (jmsDestination instanceof Topic) {
topicOrQueue = ((Topic) jmsDestination)
.getTopicName();
} else {
topicOrQueue = ((Queue) jmsDestination)
.getQueueName();
}
listener.onEvent(topicOrQueue, message);
} catch (Throwable e) {
logger.error("处理消息错误", e);
}
}
});
conn.setExceptionListener(new ConnectionExceptionListener(this,
listener, eventTopic, messageSelector));
conn.start();
// listeners.put(conn, listener);
return conn;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void configInTransport(Configuration config) {
Map<String, Object> propMap = new HashMap<>();
String acceptorName = "HornetQ JMS Server-Acceptor-Inner";
TransportConfiguration acceptorConfiguration = new TransportConfiguration(
InVMAcceptorFactory.class.getName(), propMap, acceptorName);
config.getAcceptorConfigurations().add(acceptorConfiguration);
String connectorName = "HornetQ JMS Server-Connector-Inner";
TransportConfiguration connectorConfig = new TransportConfiguration(
InVMConnectorFactory.class.getName(), propMap, connectorName);
config.getConnectorConfigurations().put("connector-inner",
connectorConfig);
}
private void configOutAcceptor(Configuration config) {
try {
// 杜天微 JVM外部通信
Map<String, Object> propMap = getConfigure("jms-acceptor",
NettyAcceptorFactory.class.newInstance()
.getAllowableProperties());
if (propMap == null) {
return;
}
Map<String, Object> m = new HashMap<>(readOnlyConfig);
m.put("jms-acceptor", propMap);
this.readOnlyConfig = m;
String acceptorName = "HornetQ JMS Server-Acceptor-Outer";
TransportConfiguration acceptorConfiguration = new TransportConfiguration(
NettyAcceptorFactory.class.getName(), propMap, acceptorName);
config.getAcceptorConfigurations().add(acceptorConfiguration);
} catch (Exception e) {
logger.error("读取JMS Acceptor配置异常", e);
}
}
/**
* JVM外部连接
*
* @param config
* @return 成功读取外部配置返回true,否则返回false
*/
private boolean configOutConnector(Configuration config) {
// 杜天微 JVM外部通信
try {
Map<String, Object> propMap = getConfigure("jms-connector",
NettyConnectorFactory.class.newInstance()
.getAllowableProperties());
if (propMap == null) {
return false;
}
Map<String, Object> m = new HashMap<>(readOnlyConfig);
m.put("jms-connector", propMap);
this.readOnlyConfig = m;
String connectorName = "HornetQ JMS Server-Connector-Outer";
TransportConfiguration outTransportConfiguration = new TransportConfiguration(
NettyConnectorFactory.class.getName(), propMap,
connectorName);
config.getConnectorConfigurations().put("connector-outer",
outTransportConfiguration);
return true;
} catch (Exception e) {
logger.error("读取JMS Connector配置异常", e);
return false;
}
}
@Override
public void createDynamicTopic(String topic) {
if (logger.isDebugEnabled()) {
logger.debug("创建Topic:" + topic);
}
try {
QueueConnectionFactory connectionFactory = (QueueConnectionFactory) ejms
.lookup(EMBEDDED_CONNECTION_FACTORY_JNDI_NAME);
QueueConnection connection = connectionFactory
.createQueueConnection();
Queue managementQueue = HornetQJMSClient
.createQueue("hornetq.management");
QueueSession session = connection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
connection.start();
Message m = session.createMessage();
JMSManagementHelper.putOperationInvocation(m, "jms.server",
"createTopic", topic);
QueueRequestor requestor = new QueueRequestor(session,
managementQueue);
@SuppressWarnings("unused")
Message reply = requestor.request(m);
session.close();
connection.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@PreDestroy
public void destory() {
try {
ejms.stop();
LockSupport.parkNanos(500);// 等待断开通知发送成功
} catch (Exception e) {
logger.error("停止消息服务错误", e);
}
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try {
mbs.unregisterMBean(mbeanName);
} catch (Exception e) {
logger.error("", e);
}
}
@SuppressWarnings("unchecked")
private Map<String, Object> getConfigure(String name, Set<String> filter) {
try {
Context context = new InitialContext();
try {
Map<String, Object> r = (Map<String, Object>) context
.lookup(name);
logger.info("在默认JNDI空间找到配置: " + name);
return filter(r, filter);
} catch (NamingException e) {
logger.debug("未在默认JNDI空间找到配置: " + name
+ e.getLocalizedMessage());
context = (Context) context.lookup("java:comp/env");
Map<String, Object> r = (Map<String, Object>) context
.lookup(name);
logger.info("在容器JNDI空间找到配置: " + name);
return filter(r, filter);
}
} catch (NamingException e) {
logger.debug("未在默认和容器JNDI空间找到配置: " + name + e.getLocalizedMessage());
}
return null;
}
private Map<String, Object> filter(Map<String, Object> m, Set<String> filter) {
Map<String, Object> ret = new HashMap<>();
for (Map.Entry<String, Object> e : m.entrySet()) {
if (filter.contains(e.getKey())) {
ret.put(e.getKey(), e.getValue());
}
}
return ret;
}
@Override
public ConnectionFactory getConnectionFactory() {
return (ConnectionFactory) ejms
.lookup(EMBEDDED_CONNECTION_FACTORY_JNDI_NAME);
}
@PostConstruct
public synchronized void init() {
if (inited) {
return;
}
// HornetQ config
Configuration config = new ConfigurationImpl();
// if (configOutConnector(config)) {
// // this.localService = false;
// } else {
// // this.localService = true;
// configOutAcceptor(config);
// }
String connector;
// 连接外部JMS服务器
this.readOnlyConfig = new HashMap<>();
if (configOutConnector(config)) {
connector = "connector-outer";
} else {
// 启动JVM服务端口
configOutAcceptor(config);
/*
* JVM内部通信
*/
configInTransport(config);
connector = "connector-inner";
}
config.setWildcardRoutingEnabled(true);
config.setPersistenceEnabled(false);
config.setSecurityEnabled(false);
// jmsConfig
JMSConfigurationImpl jmsConfig = new JMSConfigurationImpl();
ArrayList<String> connectorNames = new ArrayList<String>();
connectorNames.add(connector);
ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl(
"cf", false, connectorNames,
EMBEDDED_CONNECTION_FACTORY_JNDI_NAME);
cfConfig.setClientID("internal");
cfConfig.setReconnectAttempts(-1);
cfConfig.setRetryInterval(20);
jmsConfig.getConnectionFactoryConfigurations().add(cfConfig);
// start ejms
ejms = new EmbeddedJMS();
ejms.setConfiguration(config);
ejms.setJmsConfiguration(jmsConfig);
try {
ejms.start();
ConnectionFactory connectionFactory = this.getConnectionFactory();
this.eventConnection = connectionFactory.createConnection();
ServiceManager.getInstance().registerService(MessageService.class,
this);
try {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
this.mbeanName = new ObjectName("热血足球经理:type=服务,name=消息服务");
mbs.registerMBean(this, mbeanName);
} catch (Exception e) {
e.printStackTrace();
}
inited = true;
ServiceLoader<MessageServiceExt> exts = ServiceLoader
.load(MessageServiceExt.class);
for (MessageServiceExt ext : exts) {
try {
ext.setMessageService(this);
} catch (Exception e) {
logger.error("扩展服务初始化异常" + ext, e);
}
}
// createDynamicTopic(MESSAGE_SERVICE_CLOSEING_EVENT_TOPIC);
inited = true;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void sendEvent(String eventTopic, Map<String, Object> event) {
Session session = null;
try {
// if (logger.isDebugEnabled()) {
// logger.debug("异步事件:" + eventTopic + " " + event);
// }
session = eventConnection.createSession(false,
Session.DUPS_OK_ACKNOWLEDGE);
Message map = session.createMessage();
for (Entry<String, Object> entry : event.entrySet()) {
map.setObjectProperty(entry.getKey(), entry.getValue());
}
map.setObjectProperty(TOPIC_KEY, eventTopic);
MessageProducer producer = session.createProducer(session
.createTopic(eventTopic));
producer.setDisableMessageID(true);
producer.setDisableMessageTimestamp(false);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(map);
} catch (Exception e) {
logger.error("发送事件错误", e);
throw new RuntimeException(e);
} finally {
if (session != null) {
try {
session.close();
} catch (Exception e) {
}
}
}
}
@Override
public Map<String, Object> getReadOnlyConfig() {
return readOnlyConfig;
}
}