This content has been marked as final.
Show 2 replies
-
1. Re: JMS subscriber reconnect
genman Nov 2, 2004 4:16 PM (in response to gmand)
Take a look at
server/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
in the JBoss sources. -
2. Re: JMS subscriber reconnect
gmand Nov 8, 2004 7:20 PM (in response to gmand)Thanks -
Here is what I've implemented, however I am still having issues with regards to closing the connection. When there is a network failure, I get a ping timeout exception, which is what I expect, but when I go to close the connection it just hangs there. I never reach my notify() statement. What am I missing? This works on SilverStreams app server, is there something different that I need to do on JBoss?
import javax.jms.*;
import javax.naming.InitialContext;
import java.io.PrintWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Date;
import java.text.DateFormat;
public class SmartListener implements MessageListener, ExceptionListener, Runnable{
String topicName;
TopicSubscriber topicSubscriber;
TopicConnection connection;
public void run(){
startConnection();
}
public SmartListener(String topicName){
this.topicName = topicName;
}
public void startConnection(){
/**
* Thread which performs reconnections, usually waiting for to be instructed to start
* the re-connection process
*/
Thread t = new Thread(){
public void run(){
while(true){
reconnect();
}
}
};
t.start();
//Create TopicSubscriber, if it fails immediatley notify re-connect thread
try{
topicSubscriber = createSubscriber();
}
catch(JMSException jmse){
System.out.println("JMSException: " + jmse);
synchronized(this){
notify();
}
}
}
public String buildFilter(){
String filter = "alias in (";
for(int i=0; i<modelRef.ports.length; i++){
filter += "'" + modelRef.ports.getAlias() + "', ";
}
filter += ")";
return filter;
}
public void onMessage(Message message){
//do onMessage
System.out.println("GOT MESSAGE");
}
public synchronized boolean reconnect(){
doWait(); //Wait to be notified
System.out.print("Trying to reconnect ...");
for(int i=0; i < 10; i++){
try{
topicSubscriber = createSubscriber();
System.out.print("ok");
return true;
}
catch(Exception e){
System.out.print(".");
try{
Thread.sleep(1000);
}
catch(InterruptedException exc){}
}
}
System.err.println("failed after 10 attempts");
return false;
}
public TopicSubscriber createSubscriber() throws JMSException{
TopicSubscriber subscriber = null;
try{
InitialContext ctx = new InitialContext();
TopicConnectionFactory factory = (TopicConnectionFactory)ctx.lookup("ConnectionFactory");
Topic topic = (Topic)ctx.lookup("topic/powerControlTopic");
connection = factory.createTopicConnection();
connection.start();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
connection.setExceptionListener(this);
String filter = buildFilter();
subscriber = session.createSubscriber(topic, filter, false);
}
catch(Exception e){
throw new JMSException("Can't initialize: " + e);
}
subscriber.setMessageListener(this);
return subscriber;
}
public synchronized void onException(JMSException jmse){
System.out.println("onException(): " + jmse);
try{
connection.setExceptionListener(null);
connection.stop();
connection.close();
connection = null;
//topicSubscriber.close();
}
catch(JMSException je){
je.printStackTrace();
}
System.out.println("Notifying ...");
notify(); //trigger reconnect thread
}
public synchronized void doWait(){
try{
wait();
}
catch(InterruptedException e){e.printStackTrace();}
}
}