Clustering a Topic
schwanitzb Feb 8, 2010 2:45 PMI have my application working with a topic message producer in JBoss and a simple stand-alone topic message consumer running on the same machine. Here are code snippets:
JBoss message producer:
public static void sendMessage(String topicName, String connectionFactoryName, Map<String, Object> msgContents) throws WebFontJMSException {
if (null==topicName || topicName.length()==0) {
throw new IllegalArgumentException("topicName cannot be null or empty"); }
if (null==connectionFactoryName || connectionFactoryName.length()==0) {
throw new IllegalArgumentException("connectionFactoryName cannot be null or empty"); }
if (null==msgContents || msgContents.size()==0) {
throw new IllegalArgumentException("msgContent cannot be null or empty"); }
try { Context initialContext =
new InitialContext();
try { String topicJNDIName = Topics.
TOPIC_JNDI_PREFIX + topicName; Topic topic = (Topic) initialContext.lookup(topicJNDIName); TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory) initialContext.lookup(connectionFactoryName); TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();
try { TopicSession topicSession = topicConnection.createTopicSession(
false, Session.AUTO_ACKNOWLEDGE); javax.jms.TopicPublisher topicPublisher = topicSession.createPublisher(topic); sendMessage(topicSession, topicPublisher, msgContents); }
finally { topicConnection.close(); } }
finally { initialContext.close(); } }
catch (NamingException ne) {
throw new WebFontJMSException(ne); }
catch (JMSException jmse) {
throw new WebFontJMSException(jmse); } } Standalone message consumer: public private static TopicConnectionFactory topicConnectionFactory = null; private TopicListenerIF topicListener = null; private TopicConnection topicConnection = null; private TopicSession topicSession = null;
public TopicListenerStandalone(String topicName, String jmsHostName, TopicListenerIF topicListener) throws WebFontJMSException {
if (null==topicName || topicName.length()==0) {
throw new IllegalArgumentException("topicName cannot be null or empty"); }
if (null==jmsHostName || jmsHostName.length()==0) {
throw new IllegalArgumentException("jmsHostName cannot be null or empty"); }
if (null==topicListener) {
throw new IllegalArgumentException("topicListener cannot be null"); } Map<String, Object> connectionParams =
new HashMap<String, Object>(); connectionParams.put(TransportConstants.
HOST_PROP_NAME, jmsHostName); connectionParams.put(TransportConstants.
PORT_PROP_NAME, 5445); TransportConfiguration transportConfiguration =
new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams);
topicConnectionFactory = (TopicConnectionFactory) new HornetQConnectionFactory(transportConfiguration);
if (null == topicConnectionFactory) {
throw new WebFontJMSException("HornetQ initialization failed"); }
boolean success = false;
this.topicListener = topicListener;
try {
// Directly instantiate the JMS Topic object.
Topic topic =
new HornetQTopic(topicName);
// Create a JMS TopicConnection
topicConnection = topicConnectionFactory.createTopicConnection();
try {
// Create a JMS TopicSession
topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a JMS topic subscriber with this instance as the listener
TopicSubscriber topicSubscriber =
topicSession.createSubscriber(topic); topicSubscriber.setMessageListener(
this);
// Start the Connection
topicConnection.start(); success =
true; }
finally {
if (!success) {
topicConnection.close();
topicConnection = null; } } }
catch (JMSException jmse) {
throw new WebFontJMSException(jmse); } }
public void onMessage(Message msg) {
try {
// We know this is a MapMessage, so let's cast it here
MapMessage mapMsg = (MapMessage) msg;
Map<String, Object> msgContents =
new HashMap<String, Object>();
for (Enumeration<String> e=(Enumeration<String>)mapMsg.getMapNames(); e.hasMoreElements();) { String mapName = e.nextElement(); msgContents.put(mapName, mapMsg.getObject(mapName)); }
topicListener.onMessage(msgContents);
}
catch (Throwable t) {
// This should never throw anything, so we'll eat all exceptions here
t.printStackTrace();
}
}
public void close() throws WebFontJMSException {
try {
topicConnection.close(); }
catch (JMSException jmse) {
throw new WebFontJMSException(jmse); } } } However, I'm not able to get these same producers and consumers to work when I set up a 2 machine cluster. I run JBoss on two machines using "run.bat -c all-with-hornetq". I can see that the bridges seem to properly connect the two machines. I then start a stand-alone consumer on each of the two machines. When the message producer in JBoss creates a message in the topic, only the consumer on the same machine as the producer sees the message. I would expect that the consumers on BOTH machines would see the message. Thank you for any help you can offer. Cheers, Bill