-
1. Re: Get timeout with a consumer of topic
ovidiu.feodorov May 3, 2006 11:28 AM (in response to xinyu.zhang)I haven't seen this behavior yet.
Could you write a simple test case (for an example, look at org.jboss.test.messaging.jms.JMSTest) that I can use to reproduce it? -
2. Re: Get timeout with a consumer of topic
xinyu.zhang May 3, 2006 12:48 PM (in response to xinyu.zhang)The following are two classes, SimpleTopicListener, and SimpleTopicSender. Here is what I did:
(1) Installed JBoss 4.0.3 SP1 and JBoss Messaging 1.0.0.GA with the standard process.
(2) Start JBoss with "run -c messaging" command.
(3) Start the SimpleTopicListener class from the same computer.
(4) Start the SimpleTopicSender class from the same computer.
Before or after step (4), if the SimpleTopicListener class doesn't receive messages within 1 minute, it throws the timeout exception and won't receive any messages.
/*
* SimpleTopicListener.java
*
* Created on April 27, 2006, 12:47 PM
*
* To change this template, choose Tools | Template Manager
* and open the template in the editor.
*/
package test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
*
* @author r55151
*/
public class SimpleTopicListener extends Thread implements MessageListener {
private String topic;
private Message message;
/** Creates a new instance of SimpleTopicListener */
public SimpleTopicListener(String topic) {
this.topic = topic;
}
public void run() {
String destinationName = "/topic/" + topic;
InitialContext ic = null;
Connection connection = null;
try {
ic = new InitialContext();
ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
Topic topic = (Topic)ic.lookup(destinationName);
System.out.println("Client started to listen to Topic " + destinationName);
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer subscriber = session.createConsumer(topic);
subscriber.setMessageListener(this);
connection.start();
while (true) {
waitForMessage();
TextMessage the_message = (TextMessage)getMessage();
System.out.println("For topic " + topic + ": received message: " + the_message.getText());
} //END while (true)
} catch (NamingException ne) {
ne.printStackTrace();
} catch (JMSException jmse) {
jmse.printStackTrace();
} finally {
if(ic != null) {
try {
ic.close();
}catch(NamingException ne){
ne.printStackTrace();
}
} //END if(ic != null)
//ALWAYS close your connection in a finally block to avoid leaks
//Closing connection also takes care of closing its related objects e.g. sessions
if (connection != null) {
try {
connection.close();
} catch (JMSException jmse) {
jmse.printStackTrace();
}
} //END if (connection != null)
} //END try, finally
} //END public void run()
public synchronized void onMessage(Message message) {
this.message = message;
notifyAll();
}
/** Remove the message from the cache and return the message.
* @return Message object.
*/
public synchronized Message getMessage() {
Message ret_val = message;
message = null;
return ret_val;
}
protected synchronized void waitForMessage() {
while (message == null) {
try {
wait(1000);
} catch(InterruptedException e) {
//doing nothing
}
} //END while (message == null)
} //END protected synchronized void waitForMessage()
public static void main(String[] args) {
String topic = "testTopic";
if (args.length >= 1) {
topic = args[0];
}
(new SimpleTopicListener(topic)).start();
}
} //END public class SimpleTopicListener extends Thread implements MessageListener
/*
* SimpleTopicSender.java
*
* Created on May 3, 2006, 9:20 AM
*
* To change this template, choose Tools | Template Manager
* and open the template in the editor.
*/
package test;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
*
* @author r55151
*/
public class SimpleTopicSender {
public static void main(String[]args) {
int msg_number = 10; //number of messages to be sent
if (args.length >= 1) {
try {
msg_number = Integer.valueOf(args[0]).intValue();
} catch (NumberFormatException nfe) {
//doing nothing.
}
}
String topic_name = "testTopic";
String destinationName = "/topic/" + topic_name;
InitialContext ic = null;
Connection connection = null;
try {
ic = new InitialContext();
ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
Topic topic = (Topic)ic.lookup(destinationName);
System.out.println("Topic " + destinationName + " exists");
connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer publisher = session.createProducer(topic);
connection.start();
String msg = "Sending the message to topic " + topic_name + ", serial no = ";
for (int i=0;i<msg_number;i++) {
String message = msg + i;
System.out.println("will send message: " + message);
TextMessage text_message = session.createTextMessage(message);
publisher.send(text_message);
System.out.println("==== The message was successfully published on the topic " + topic_name);
try {
Thread.sleep(500);
} catch (InterruptedException iue) {
//doing nothing
}
} //END for (int i=0;i<messages.length;i++)
} catch (NamingException ne) {
ne.printStackTrace();
} catch (JMSException jmse) {
jmse.printStackTrace();
}finally{
if(ic != null) {
try {
ic.close();
}catch(NamingException ne){
ne.printStackTrace();
}
}
//ALWAYS close your connection in a finally block to avoid leaks
//Closing connection also takes care of closing its related objects e.g. sessions
if (connection != null) {
try {
connection.close();
} catch (JMSException jmse) {
jmse.printStackTrace();
}
}
} //END try, finally
System.exit(0);
} //END public static void main(Sring[]args)
} //END public class SimpleSendMessagesToTopic -
3. Re: Get timeout with a consumer of topic
ovidiu.feodorov May 5, 2006 6:18 PM (in response to xinyu.zhang)It's a bug. Thanks for reporting it. Use this to track it: http://jira.jboss.org/jira/browse/JBMESSAGING-371
Will be fixed in 1.0.1.CR2. -
4. Re: Get timeout with a consumer of topic
ovidiu.feodorov May 8, 2006 4:05 PM (in response to xinyu.zhang)It seems that Remoting behavior changed recently.
I am following up on this on the Remoting forum http://www.jboss.com/index.html?module=bb&op=viewtopic&p=3941996#3941996
Until this is clarified, I changed Messaging to conform to the new Remoting behavior, and added a test to detect such changes in the future.
The fix is available in the head and it will be included in 1.0.1.CR2 -
5. Re: Get timeout with a consumer of topic
wxxg Jun 4, 2006 6:50 AM (in response to xinyu.zhang)Hi there,
We have similar "read timed out" problem with JBoss MQ (4.0.3sp1 and 4.0.4.GA), instead of JBoss Messaging. Could JBoss experts here advice if the fix is applicable to JBoss MQ as well?
Regards/Brian -
6. Re: Get timeout with a consumer of topic
timfox Jun 4, 2006 7:05 AM (in response to xinyu.zhang)Highly unlikely the fix will apply to JBoss MQ.
Please can you post in the JBoss MQ forum.