HornetQ throttling on Clustered JBoss. What am I doing wrong?
eeimarn Nov 14, 2013 4:17 AMBackround
=========
I cannot seem to get consumer-max-rate throttling working on JBoss/HornetQ. I have tried a number of solutions but cannot seem to
get it to work. Any help or pointers as to what I am doing wrong would be a great help. Sorry about the long post.
Interesting part from Standalone.xml
=======================================
<connection-factory name="InVmConnectionFactory">
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
<entries>
<entry name="java:/ConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
<consumer-max-rate>5</consumer-max-rate>
</connection-factory>
Note: The Queue MtasQueue is non-persistent (could this be having any effect?).
I need to set the consumer-window-size to 0 as messages have priority so we don't want any pre-fetching in the system.
Interesting part from Producer of the ConnectionFactory
=======================================================
@Resource(mappedName = "java:/ConnectionFactory")
private ConnectionFactory connectionFactory;
@Resource(mappedName = "java:jboss/jms/queue/coh/MtasQueue")
private Queue ccmpQueue;
@Produces @CcmpConnection
public Connection createCcmpConnection() throws JMSException {
return connectionFactory.createConnection();
}
public void closeCcmpConnection(@Disposes @CcmpConnection Connection connection) throws JMSException {
connection.close();
}
@Produces @CcmpSession
public Session createCcmpSession(@CcmpConnection Connection connection) throws JMSException {
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void closeCcmpSession(@Disposes @CcmpSession Session session) throws JMSException {
session.close();
}
@Produces @CcmpMessageProducer
public MessageProducer createCcmpMessageProducer(@CcmpSession Session session) throws JMSException {
return session.createProducer(ccmpQueue);
}
public void closeCcmpMessageProducer(@Disposes @CcmpMessageProducer MessageProducer producer) throws JMSException {
producer.close();
}
The message producer
====================
Note: I wonder am I really following best practice here (I am trying to avoid the anti-pattern described in the
HornetQ userGuide https://access.redhat.com/site/documentation/en-US/JBoss_Enterprise_Application_Platform/5/html/HornetQ_User_Guide/ch43s05.html)
i.e. not opening and closing a Session on every send, is this the correct way to do it?
@Stateless
public class CCMPControlMessageSourceImpl implements CCMPControlMessageSource {
@Inject @CcmpSession
private Session session;
@Inject @CcmpMessageProducer
private MessageProducer producer;
@Override
public void send(CCMPControlMessage cm) {
try {
TextMessage message = session.createTextMessage();
message.setStringProperty(CCMPControlMessage.OPERATION, cm.getOperation());
message.setText(marshall(cm));
producer.send(message, cm.getDeliveryMode(), cm.getPriority(), cm.getTimeToLive());
} catch (JMSException jmse) {
LOG.warn("Failed to send jms (jms error). Exception: {}", jmse.getMessage());
} catch (JAXBException jaxbe) {
LOG.warn("Failed to send jms (marshalling error). Exception: {}", jaxbe.getMessage());
}
}
The message consumer
====================
Note: I am using a message selector could this be having an effect?
@MessageDriven(name = "RetrieveConferenceControlMessageSink", activationConfig = {
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "destination", propertyValue = "java:jboss/jms/queue/coh/MtasQueue"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "maxSession", propertyValue = "1"),
@ActivationConfigProperty(propertyName = "messageSelector", propertyValue = ControlMessage.OPERATION + "=" + "'" + ControlMessage.CCMP_RETRIEVE + "'")
})
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class RetrieveConferenceControlMessageSink extends AbstractControlMessageSink implements MessageListener {
@EJB
private MtasReader mtasReader;
@Override
@Interceptors({PerformanceInterceptor.class})
public void onMessage(Message msg) {
try {
TextMessage textMessage = (TextMessage) msg;
RetrieveConference command = unmarshall(textMessage.getText(), RetrieveConference.class);
mtasReader.updateCache(command.getConferenceId());
} catch (JAXBException e) {
throw new RuntimeException(e);
} catch (JMSException e) {
//swallow the failure
LOG.warn("Failed " + ControlMessage.CCMP_RETRIEVE + " because of " + e.getMessage());
}
}
if you have read this far all i can say is thanks