MessageListener stops listening to hornetq Queue
shirshendu Dec 4, 2018 6:30 AMHi ALL,
The following issue is impairing my production system. So i have multiple MDB's packaged as EAR, WAR applications and are deployed in JBOSS. When there is considerable amount of traffic in my website then these MDB's stop listening to messages being written to queues in HornetQ and I am forced to restart my system. The last time this happened i wrote a standalone message listener and was able to listen to messages from the same hq server. This pointed out the issue to be at my application server/ application level. I am attaching the following:
1. A typical MDB
2. JBOSS standalone configuration file
/**
* Message-Driven Bean implementation class for: ChatHandlerTopic
*/
@MessageDriven(activationConfig = {
/*@ActivationConfigProperty(propertyName = "messageSelector", propertyValue = "MESSAGE_TYPE = 'ChatTransfer' OR MESSAGE_TYPE = 'ChatAction' "
+ "or MESSAGE_TYPE = 'InboundSms' or MESSAGE_TYPE = 'ChatMessage'"
+ " or MESSAGE_TYPE = 'TropoSmsDelivery'"),*/
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = Queues.CHAT_HANDLER),
})
@ResourceAdapter("hornetq-ra.rar")
public class ChatHandlerQueueListener implements MessageListener {
public static final Logger logger = LoggerFactory.getLogger(ChatHandlerQueueListener.class);
@Inject
IChatManager chatManager;
/**
* Default constructor.
*/
public ChatHandlerQueueListener() {
}
/**
* @see MessageListener#onMessage(Message)
*/
public void onMessage(Message message) {
ObjectMessage objectMessage = (ObjectMessage) message;
ComponentMessage routingEngineResponse = null;
try {
routingEngineResponse = (ComponentMessage) objectMessage.getObject();
boolean messageRedelivered = message.getJMSRedelivered();
if(logger.isTraceEnabled())
logger.trace("ChatHandlerQueueListener.callingChatManager Incoming response is {}",
JsonUtils.toJson(routingEngineResponse));
if (routingEngineResponse == null)
return;
if(messageRedelivered){
// Sending the message acknowledgement manually
message.acknowledge();
}
} catch (JMSException e) {
logger.error("ChatHandlerQueueListener.onMessage Type: null", e);
}
if (routingEngineResponse.getType().equals(MessageType.ChatAction)
|| routingEngineResponse.getType().equals(MessageType.ChatTransfer)) {
try {
logger.debug("ChatHandlerQueueListener.callingChatManager {}", JsonUtils.toJson(routingEngineResponse));
chatManager.processRoutingEngineResponseMessage(routingEngineResponse);
} catch (UnknownReActionTypeException e) {
logger.error("ChatHandlerQueueListener.onMessage Type: UnknownReActionTypeException {}", e);
}
}else if(routingEngineResponse.getType().equals(MessageType.InboundSms)){
logger.debug("Calling request for agent {}", JsonUtils.toJson(routingEngineResponse));
try {
chatManager.processChatMessage(routingEngineResponse);
} catch (ChatServiceUnavailableException | JMSException | ApplicationException e) {
logger.error("ChatHandlerQueueListener.onMessage Type: Exception {}", e);
}
}else if(routingEngineResponse.getType().equals(MessageType.ChatMessage)){
try {
chatManager.processChatMessage(routingEngineResponse);
} catch (ChatServiceUnavailableException | JMSException | ApplicationException e) {
logger.error("ChatHandlerQueueListener.onMessage Type: Exception {}", e);
}
}else if(routingEngineResponse.getType().equals(MessageType.TropoSmsDelivery)){
logger.debug("Calling smsDelvieryHandler {}", JsonUtils.toJson(routingEngineResponse));
try {
chatManager.processSmsDeliveryMessage(routingEngineResponse);
} catch (Exception e) {
logger.error("ChatHandlerQueueListener.onMessage Type: Exception {}", e);
}
}else{
try {
logger.trace("Unexpected message seletor found: {}",message.getStringProperty("MESSAGE_TYPE"));
} catch (JMSException e) {
logger.error("ChatHandlerQueueListener.onMessage Type: Exception {}", e);
}
}
}
}
2. JBOSS CONFIGRATION FILE
<hornetq-server>
<persistence-enabled>true</persistence-enabled>
<connectors>
<connector name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${jboss.bind.remote.hq.address}"/>
<param key="port" value="${jboss.bind.remote.hq.port}"/>
</connector>
</connectors>
<jms-connection-factories>
<connection-factory name="RemoteConnectionFactory">
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="RemoteConnectionFactory"/>
</entries>
</connection-factory>
<pooled-connection-factory name="hornetq-ra">
<transaction mode="xa"/>
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="java:/JmsXA"/>
</entries>
</pooled-connection-factory>
</jms-connection-factories>
</hornetq-server>
Please help me in resolving this issue.