Sorry for posting this late.
Upgrading to CR2 of jboss-messaging solved the hang problem. I am posting the working code anyway.
And these variables need to be set in build.properties.
jms.test.java.naming.provider.url = jnp://<server name>:1099
jms.test.java.naming.factory.initial = org.jnp.interfaces.NamingContextFactory
And the jboss client is a standalone program.
Thanks
Raghu
import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.Topic;
import javax.naming.InitialContext;
import junit.framework.TestCase;
public class TestJMS1 extends TestCase {
Date date;
static final String nameTopicConnFactory = "XAConnectionFactory";
static final String nameTopic = "topic/testTopic";
static final int CONSUMER_WAIT_TIME = 60000;
static final int RELAYER_WAIT_TIME = 30000;
public TestJMS1(String name) {
super(name);
}
public void setUp() {
}
public void tearDown() {
}
public void testJMSFastPublisherSlowConsumerWithRelay() throws Exception {
String testName = "Publish/Subscribe with Slow Consumer and Relayer - AUTO ACK ";
ConnectionFactory connFactory = null;
Connection connection = null;
Session session = null;
Destination topic = null;
MyRelayerWithNewSession relayThread = null;
MyConsumerWithNewSession consumerThread = null;
WatchdogTimer watchdog = null;
MessageProducer[] producers = new MessageProducer[1];
try {
InitialContext ic = new InitialContext ();
//System.out.println ("Created InitialContext :: " + ic);
String payload = "The static portion ";
connFactory = (ConnectionFactory) ic.lookup (nameTopicConnFactory);
topic = (Topic)ic.lookup(nameTopic);
connection = connFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producers[0] = session.createProducer(topic);
relayThread = new MyRelayerWithNewSession(connection, topic, "text0", "text1",
RELAYER_WAIT_TIME, false, payload);
consumerThread = new MyConsumerWithNewSession(connection, topic, "text1",
CONSUMER_WAIT_TIME, false, payload);
connection.start();
// watchdog = new WatchdogTimer(producers, consumers, 90000, connection);
//System.out.println("Sender starting");
try {
// For threads to create the sessions.
Thread.sleep(5000);
for(int i=0; i<10000; i++) {
ObjectMessage message = session.createObjectMessage();
message.setStringProperty("target", "text0");
message.setObject(payload+i);
producers[0].send(message);
// System.out.println("Sent iter : " + i);
// Thread.sleep(1000);
if (i%100 == 0) {
System.out.println("Sent " + i + "messages");
}
}
} catch (Throwable t ) {
System.out.println("Producer1 send got Error: "+t.getMessage());
}
System.out.println("Done with sending");
Thread.sleep(200000);
consumerThread.join();
relayThread.join();
} catch (Throwable t) {
System.err.println("Error: "+t.getMessage());
// t.printStackTrace(System.err);
} finally {
try {
if (connection != null){
connection.close();
connection = null;
}
} catch (JMSException e) {
e.printStackTrace();
}
}
// if (watchdog.isAlive()) {
// watchdog.interrupt();
// }
// watchdog.join();
date = new Date();
if (!watchdog.interrupted) {
System.out.println(date.toString()+": "+testName + " : PASSED");
} else {
System.out.println(date.toString()+": "+testName + " : FAILED");
}
}
class MyRelayerWithNewSession extends Thread {
Connection connection = null;
Destination topic = null;
String localTarget = null;
String relayTarget = null;
int delay_ms = 0;
boolean explicit_ack = false;
String payload = null;
Session consumeSession = null;
Session produceSession = null;
MessageConsumer consumer = null;
MessageProducer producer = null;
MyRelayerWithNewSession(Connection connection, Destination topic, String localTarget,
String relayTarget, int delay_ms, boolean explicit_ack, String payload) {
this.connection = connection;
this.topic = topic;
this.localTarget = localTarget;
this.relayTarget = relayTarget;
this.delay_ms = delay_ms;
this.explicit_ack = explicit_ack;
this.payload = payload;
start();
}
public void run(){
try {
consumeSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
produceSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = consumeSession.createConsumer(topic, "target='"+localTarget+"'", false);
producer = produceSession.createProducer(topic);
Thread.sleep(delay_ms);
System.out.println("Relayer thread waking up");
for (int i=0; i< 10000; i ++) {
Message receivedMessage = consumer.receive();
if (explicit_ack) {
receivedMessage.acknowledge();
}
String receivedPayload = (String) ((ObjectMessage) receivedMessage).getObject();
// System.out.println("Relay Read at iter "+i+" : " + receivedPayload);
// System.out.println("Relay iter "+i+" : "+receivedPayload.substring(19));
// assertEquals(true, receivedPayload.equals(payload+i));
ObjectMessage message = produceSession.createObjectMessage();
message.setStringProperty("target", relayTarget);
message.setObject(receivedPayload);
producer.send(message);
if (i%100 == 0) {
System.out.println("Relayed " + i + "messages");
}
}
System.out.println("Done with Relayer thread");
} catch (Throwable t) {
System.err.println("Relayer Thread encountered Error: "+t.getMessage());
// t.printStackTrace(System.err);
}
}
}
class MyConsumerWithNewSession extends Thread {
Connection connection = null;
Destination topic = null;
String localTarget = null;
int delay_ms = 0;
boolean explicit_ack = false;
String payload = null;
Session consumeSession = null;
MessageConsumer consumer = null;
MyConsumerWithNewSession(Connection connection, Destination topic, String localTarget,
int delay_ms, boolean explicit_ack, String payload) {
this.connection = connection;
this.topic = topic;
this.localTarget = localTarget;
this.delay_ms = delay_ms;
this.explicit_ack = explicit_ack;
this.payload = payload;
start();
}
public void run(){
try {
consumeSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = consumeSession.createConsumer(topic, "target='"+localTarget+"'", false);
Thread.sleep(delay_ms);
System.out.println("Receiver waking up");
for (int i=0; i< 10000; i ++) {
Message receivedMessage = consumer.receive();
if (explicit_ack) {
receivedMessage.acknowledge();
}
String receivedPayload = (String) ((ObjectMessage) receivedMessage).getObject();
// System.out.println("Receive Read at iter "+i+" : " + receivedPayload);
// System.out.println("Receive iter "+i+" : "+receivedPayload.substring(19));
// assertEquals(true, receivedPayload.equals(payload+i));
if (i%100 == 0) {
System.out.println("Received " + i + "messages");
}
}
System.out.println("Done with Consumer thread");
} catch (Throwable t) {
System.err.println("Receiver Thread encountered Error: "+t.getMessage());
// t.printStackTrace(System.err);
}
}
}
static class WatchdogTimer extends Thread {
long waitTime = 0L;
boolean interrupted = false;
Connection connection = null;
MessageProducer[] producers = null;
MessageConsumer [] consumers = null;
public WatchdogTimer (MessageProducer[] producers, MessageConsumer [] consumers,
long waitTime, Connection connection) {
this.producers = producers;
this.consumers = consumers;
this.waitTime = waitTime;
this.connection = connection;
start();
}
public void run () {
try {
Thread.sleep(this.waitTime);
this.interrupted = true;
System.out.println("Watchdog waking up: closing producers and consumers");
for (int i = 0; i < this.consumers.length; i++) {
this.consumers.close();
}
for (int i = 0; i < this.producers.length; i++) {
this.producers.close();
}
connection.close();
} catch (InterruptedException thrExc) {
System.out.println("watchdog interrupted");
} catch (JMSException jmsExc) {
System.out.println("watchdog got jmsExc");
}
}
}
}