3 Replies Latest reply on Nov 14, 2013 1:12 PM by Justin Bertram

    HornetQ throttling on Clustered JBoss. What am I doing wrong?

    Tom Murphy Newbie



      I cannot seem to get consumer-max-rate throttling working on JBoss/HornetQ. I have tried a number of solutions but cannot seem to

      get it to work. Any help or pointers as to what I am doing wrong would be a great help. Sorry about the long post.


      Interesting part from Standalone.xml



      <connection-factory name="InVmConnectionFactory">


              <connector-ref connector-name="in-vm"/>



                  <entry name="java:/ConnectionFactory"/>







      Note: The Queue MtasQueue is non-persistent (could this be having any effect?).

      I need to set the consumer-window-size to 0 as messages have priority so we don't want any pre-fetching in the system.


      Interesting part from Producer of the ConnectionFactory



      @Resource(mappedName = "java:/ConnectionFactory")

      private ConnectionFactory connectionFactory;  

      @Resource(mappedName = "java:jboss/jms/queue/coh/MtasQueue")

      private Queue ccmpQueue;


      @Produces @CcmpConnection

      public Connection createCcmpConnection() throws JMSException {

          return connectionFactory.createConnection();



      public void closeCcmpConnection(@Disposes @CcmpConnection Connection connection) throws JMSException {




      @Produces @CcmpSession

      public Session createCcmpSession(@CcmpConnection Connection connection) throws JMSException {

          return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);



      public void closeCcmpSession(@Disposes @CcmpSession Session session) throws JMSException {




      @Produces @CcmpMessageProducer

      public MessageProducer createCcmpMessageProducer(@CcmpSession Session session) throws JMSException {

          return session.createProducer(ccmpQueue);



      public void closeCcmpMessageProducer(@Disposes @CcmpMessageProducer MessageProducer producer) throws JMSException {




      The message producer


      Note: I wonder am I really following best practice here (I am trying to avoid the anti-pattern described in the

      HornetQ userGuide https://access.redhat.com/site/documentation/en-US/JBoss_Enterprise_Application_Platform/5/html/HornetQ_User_Guide/ch43s05.html)

      i.e. not opening and closing a Session on every send, is this the correct way to do it?


      public class CCMPControlMessageSourceImpl implements CCMPControlMessageSource {


          @Inject @CcmpSession

          private Session session;


          @Inject @CcmpMessageProducer

          private MessageProducer producer;



          public void send(CCMPControlMessage cm) {


              try {

                  TextMessage message = session.createTextMessage();

                  message.setStringProperty(CCMPControlMessage.OPERATION, cm.getOperation());


                  producer.send(message, cm.getDeliveryMode(), cm.getPriority(), cm.getTimeToLive());

              } catch (JMSException jmse) {

                  LOG.warn("Failed to send jms (jms error). Exception: {}", jmse.getMessage());

              } catch (JAXBException jaxbe) {

                  LOG.warn("Failed to send jms (marshalling error). Exception: {}", jaxbe.getMessage());




      The message consumer


      Note: I am using a message selector could this be having an effect?

      @MessageDriven(name = "RetrieveConferenceControlMessageSink", activationConfig = {

          @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),

          @ActivationConfigProperty(propertyName = "destination", propertyValue = "java:jboss/jms/queue/coh/MtasQueue"),

          @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),

          @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "1"),

          @ActivationConfigProperty(propertyName = "messageSelector", propertyValue = ControlMessage.OPERATION + "=" + "'" + ControlMessage.CCMP_RETRIEVE + "'")



      public class RetrieveConferenceControlMessageSink extends AbstractControlMessageSink implements MessageListener {



          private MtasReader mtasReader;  




          public void onMessage(Message msg) {

              try {

                  TextMessage textMessage = (TextMessage) msg;

                  RetrieveConference command = unmarshall(textMessage.getText(), RetrieveConference.class);



              } catch (JAXBException e) {

                  throw new RuntimeException(e);

              } catch (JMSException e) {

              //swallow the failure

                  LOG.warn("Failed " + ControlMessage.CCMP_RETRIEVE + " because of " + e.getMessage());




      if you have read this far all i can say is thanks