Hi Tim,
First of all, I have a large portion of humble pie to eat - I hadn't clustered the SLSB that publishes. However, I'm still seeing unreliable results, so I'll elaborate on what I am doing...
I have a pojo that connects via JNDI to queue/testQueue on the ClusteredConnectionFactory.
private void startQueue() throws JMSException, NamingException {
conn = ((ConnectionFactory) ctx.lookup("ClusteredConnectionFactory"))
.createConnection("guest", "guest");
queue = (Queue) ctx.lookup("queue/testQueue");
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
}
It sends 1000 messages (say) of size 1Kb, that has an integer property 'TopicID' that is a random number from 1 to 50, representing the index of the topic that the message will be forwarded to inside JBoss.
In my two node JBoss cluster, I have an EJB3 MDB that is listening to the queue configured thus:
@Clustered
@MessageDriven(
messageListenerInterface=MessageListener.class,
activationConfig = {
@ActivationConfigProperty(propertyName="destination", propertyValue="queue/testQueue"),
@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue")
})
public class OamServerPushImpl implements OamServerPush, MessageListener
This MDB simply forwards onto a SLSB configured as follows, which publishes the message to the topic indicated by the TopicID property :
@Clustered
@Stateless
public class PublisherImpl implements Publisher {
@Resource(mappedName="ClusteredConnectionFactory")
ConnectionFactory topicConnectionFactory;
@Resource
private SessionContext sc;
Topic topic;
public void redispatch( Message m, String sReceiverAddress, int iMessageID) {
Session session = null;
TextMessage message = null;
try {
int iTopicID = m.getIntProperty("TopicID");
String sPayload = m.getStringProperty("Payload");
connection = topicConnectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// connection.stop();
connection.start();
message = session.createTextMessage();
message.setText("Redispatching : " + sPayload);
message.setStringProperty("ReceivedBy", sReceiverAddress);
message.setStringProperty("PublishedBy", InetAddress.getLocalHost().getHostAddress());
message.setIntProperty("MessageID", iMessageID);
sendToClients(session, message, iTopicID);
connection.close();
} catch (Throwable t) {
// JMSException could be thrown
logger.warn(
"PublisherBean.redispatch: " + "Exception: "
+ t.toString());
sc.setRollbackOnly();
} finally {
if (session != null) {
try {
session.close();
} catch (JMSException e) {
}
}
}
}
public void sendToClients(Session session, Message message, int iTopicID)
throws JMSException {
topic = (Topic) sc.lookup("topic/testDurableTopic" + iTopicID);
MessageProducer publisher = session.createProducer(topic);
publisher.send(message);
publisher.close();
logger.info("Publisher BEAN: Message published: (" +
((TextMessage)message).getText().length() +") bytes");
}
I then have another 50 pojos outside the container each listening to one topic via a durable subscription
public class MessageSubscriberClient implements Runnable {
private static final Logger log = Logger.getLogger(MessageSubscriberClient.class);
int iCount = 0;
Connection conn;
Session session;
TopicSubscriber subscriber;
Topic topic;
Context ctx;
int i;
boolean bFinished = false;
static int NUMBER_OF_MESSAGES = 1000;
static final int NUMBER_OF_TOPICS = 50;
public MessageSubscriberClient(Context ctx, int i) {
this.i = i;
this.ctx = ctx;
try {
startTopic();
}
catch (JMSException e1) {
log.error(e1.getMessage(), e1);
}
catch (NamingException e) {
log.error(e.getMessage(), e);
}
}
public void run() {
try {
receive();
} catch (JMSException e) {
log.error(e.getMessage(), e);
} catch (NamingException e) {
log.error(e.getMessage(), e);
}
log.warn("Stopping topic on " + i);
stopTopic();
}
public void receive() throws JMSException, NamingException {
while ( ! bFinished) {
try {
if (subscriber != null) {
process();
}
else {
// try to recover
log.info(i + " not well - subscriber null");
// stopTopic();
// startTopic();
}
} catch (JMSException e) {
// try to recover
// stopTopic();
// startTopic();
}
}
log.info("Finished receiving on " + i);
}
public int getNumReceivedMessages() {
return iCount;
}
private void startTopic() throws JMSException, NamingException {
conn = (Connection) ((ConnectionFactory) ctx
.lookup("ClusteredConnectionFactory")).createConnection("john" + i, "needle");
topic = (Topic) ctx.lookup("topic/testDurableTopic" + i);
session = conn.createSession(false,
TopicSession.AUTO_ACKNOWLEDGE);
conn.stop();
// subscription
subscriber = session.createDurableSubscriber(topic, "finbar" + i);
conn.start();
}
private void stopTopic() {
log.info("In Stopping topic on " + i);
try {
if (subscriber != null) {
subscriber.close();
log.info("finbar" + i + " closed");
}
} catch (JMSException e) {
log.error(e.getMessage(), e);
}
try {
if (session != null) {
session.unsubscribe("finbar" + i);
log.info("finbar" + i + " unsubscribed");
}
} catch (JMSException e) {
log.error(e.getMessage(), e);
}
try {
if (conn != null)
conn.close();
} catch (JMSException e) {
log.error(e.getMessage(), e);
}
}
public void setFinished( boolean b) {
bFinished = b;
}
public void process() throws JMSException {
Message m = subscriber.receive(1000);
TextMessage t = (TextMessage) m;
if (t != null) {
m.acknowledge();
iCount++;
}
}
}
In the receive method above, I've indicated where I've tried to detect receiver problems and recover by stopping and stating the topic.
I then run the receiver and sender code, wait until messages happily being sent and received, and then kill the JBoss node (kill -9) that is processing the messages.
With the receiver code as-is above (not trying to recover), the threads just sit in the while loop reporting
34864 [Thread-94] ERROR org.jboss.jms.client.container.ClosedInterceptor - ClosedInterceptor.ClientConsumerDelegate[e9-cs3ijw9f-1-64tmiw9f-c5iuxj-r55ss4]: method receive() did not go through, the interceptor is CLOSED
javax.jms.IllegalStateException: The object is closed
at org.jboss.jms.client.container.ClosedInterceptor.invoke(ClosedInterceptor.java:157)
at org.jboss.aop.advice.PerInstanceInterceptor.invoke(PerInstanceInterceptor.java:105)
at org.jboss.jms.client.delegate.ClientConsumerDelegate$receive_N8299950230150603585.invokeNext(ClientConsumerDelegate$receive_N8299950230150603585.java)
at org.jboss.jms.client.delegate.ClientConsumerDelegate.receive(ClientConsumerDelegate.java)
at org.jboss.jms.client.JBossMessageConsumer.receive(JBossMessageConsumer.java:86)
at com.ipaccess.MessageSubscriberClient.process(MessageSubscriberClient.java:154)
at com.ipaccess.MessageSubscriberClient.receive(MessageSubscriberClient.java:78)
at com.ipaccess.MessageSubscriberClient.run(MessageSubscriberClient.java:64)
at java.lang.Thread.run(Thread.java:595)
If on the other hand I try to recover the topic by stopping and starting it, I get much 'better' results (I sometimes receive all messages), but more llikely I find that some messages get stuck in JBoss. The receivers will sit there listening whilst JBoss client side code reports the failover (with a lot of jboss remoting logging that I realise is an outstanding JIRA task)
294287 [Timer-0] WARN org.jboss.remoting.LeasePinger - LeasePinger[SocketClientInvoker[1c0f2e5, bisocket://172.28.5.208:4457](4ss15g-cvnwhk-f9wjxoka-1-f9wjxoll-3)] failed to ping to server: Can not get connection to server. Problem establishing socket connection for InvokerLocator [bisocket://172.28.5.208:4457/?clientLeasePeriod=10000&clientMaxPoolSize
=200&clientSocketClass=org.jboss.jms.client.remoting.ClientSocketWrapper&
dataType=jms&marshaller=org.jboss.jms.wireformat.JMSWireFormat&numberOf
CallRetries=1&numberOfRetries=10&pingFrequency=214748364&pingWindowFactor=10&
socket.check_connection=false&timeout=0&unmarshaller=org.jboss.jms.wireformat.JMSWireFormat]
294584 [Thread-277] WARN org.jboss.remoting.LeasePinger - LeasePinger[SocketClientInvoker[1c0f2e5, bisocket://172.28.5.208:4457](4ss15g-cvnwhk-f9wjxoka-1-f9wjxoll-3)] failed sending disconnect for client lease for client with session ID 4ss15g-cvnwhk-f9wjxoka-1-f9wjxqil-1t
and
- unable to get secondary locator
org.jboss.remoting.CannotConnectException: Can not get connection to server. Problem establishing socket connection for InvokerLocator [bisocket://172.28.5.208:4457/?clientLeasePeriod=10000&clientMaxPoolSize=200&clientSocketClass=org.jboss.
jms.client.remoting.ClientSocketWrapper&dataType=jms&marshaller=org.jboss.jms.wireformat.JMSWireFormat&
numberOfCallRetries=1&numberOfRetries=10&pingFrequency=214748364&pingWindowFactor=10&socket.check_
connection=false&timeout=0&unmarshaller=org.jboss.jms.wireformat.JMSWireFormat]
at org.jboss.remoting.transport.socket.MicroSocketClientInvoker.transport(MicroSocketClientInvoker.java:532)
at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.transport(BisocketClientInvoker.java:413)
at org.jboss.remoting.MicroRemoteClientInvoker.invoke(MicroRemoteClientInvoker.java:122)
at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.getSecondaryLocator(BisocketClientInvoker.java:538)
at org.jboss.remoting.transport.bisocket.BisocketServerInvoker.createControlConnection(BisocketServerInvoker.java:228)
at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.transport(BisocketClientInvoker.java:402)
at org.jboss.remoting.MicroRemoteClientInvoker.invoke(MicroRemoteClientInvoker.java:122)
at org.jboss.remoting.Client.invoke(Client.java:1634)
at org.jboss.remoting.Client.addCallbackListener(Client.java:1703)
at org.jboss.remoting.Client.addListener(Client.java:921)
at org.jboss.jms.client.remoting.JMSRemotingConnection.addInvokerCallbackHandler(JMSRemotingConnection.java:237)
at org.jboss.jms.client.remoting.JMSRemotingConnection.start(JMSRemotingConnection.java:312)
at org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate.establishCallback(ClientClusteredConnectionFactoryDelegate.java:99)
at org.jboss.jms.client.remoting.ConnectionFactoryCallbackHandler$CallbackConnectionListener.handleConnectionException(ConnectionFactoryCallbackHandler.java:105)
at org.jboss.remoting.ConnectionValidator$1.run(ConnectionValidator.java:452)
Caused by: java.net.ConnectException: Connection refused: connect
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:333)
at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:195)
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:366)
at java.net.Socket.connect(Socket.java:519)
at org.jboss.remoting.transport.socket.SocketClientInvoker.createSocket(SocketClientInvoker.java:187)
at org.jboss.remoting.transport.bisocket.BisocketClientInvoker.createSocket(BisocketClientInvoker.java:420)
at org.jboss.remoting.transport.socket.MicroSocketClientInvoker.getConnection(MicroSocketClientInvoker.java:815)
at org.jboss.remoting.transport.socket.MicroSocketClientInvoker.transport(MicroSocketClientInvoker.java:525)
... 14 more
eventually (over an hour later) , most receivers reconnect, but some never do. I see
1562035 [Thread-313] ERROR org.jboss.remoting.MicroRemoteClientInvoker - error shutting down lease pinger
and then it all goes quiet - no more logging. Despite leaving the sender/receivers and one JBoss node up for well over and hour, no more messages get through.
I wasn't expecting to have to stop and start the topics on the receivers, but it looks like I have to. But even if I do, it is probable that some receivers do not failover.
Other info... I execute sender and receivers from JUnit, but I think putting it in this post is unnecessary
Inside JBoss, the destinations are configured like so
<mbean code="org.jboss.jms.server.destination.QueueService"
name="jboss.messaging.destination:service=Queue,name=testQueue"
xmbean-dd="xmdesc/Queue-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.messaging:service=PostOffice</depends>
<attribute name="SecurityConfig">
<attribute name="Clustered">true</attribute>
<security>
<role name="guest" read="true" write="true"/>
<role name="publisher" read="true" write="true" create="false"/>
<role name="noacc" read="false" write="false" create="false"/>
</security>
</attribute>
</mbean>
<!-- Repeat for testDurableTopic0 to testDurableTopic49 -->
<mbean code="org.jboss.jms.server.destination.TopicService" name="jboss.messaging.destination:service=Topic,name=testDurableTopic0"
xmbean-dd="xmdesc/Topic-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.messaging:service=PostOffice</depends>
<attribute name="Clustered">true</attribute>
<attribute name="SecurityConfig">
<security>
<role name="guest" read="true" write="true"/>
<role name="publisher" read="true" write="true" create="false"/>
<role name="durpublisher" read="true" write="true" create="true"/>
</security>
</attribute>
</mbean>
I have a MySQl-persistence-service.xml with the appropriate clustered attribute and SQL to set up the users, passwords, roles and ClientIDs
<attribute name="Clustered">true</attribute>
...
POPULATE.TABLES.69 = INSERT INTO JBM_USER (USER_ID,PASSWD,CLIENTID) VALUES ('john49', 'needle', 'DurableSubscriberExample49')
POPULATE.TABLES.119 = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES ('durpublisher','john49')
and I set ServerPeerID to 0 and 1 respectively in messaging-service.xml for the two nodes.
JBoss is running on RHEL 4 JRE 1.5.0_12
JBoss 4.2.2 GA with JBM 1.4.0.SP1 configured as per documentation.
Pojo sender and receiver are running on WinXP JRE 1.5.0_12.