3 Replies Latest reply on Nov 14, 2013 1:12 PM by Justin Bertram

    HornetQ throttling on Clustered JBoss. What am I doing wrong?

    Tom Murphy Newbie

      Backround

      =========

      I cannot seem to get consumer-max-rate throttling working on JBoss/HornetQ. I have tried a number of solutions but cannot seem to

      get it to work. Any help or pointers as to what I am doing wrong would be a great help. Sorry about the long post.

       

      Interesting part from Standalone.xml

      =======================================

       

      <connection-factory name="InVmConnectionFactory">

          <connectors>

              <connector-ref connector-name="in-vm"/>

          </connectors>

              <entries>

                  <entry name="java:/ConnectionFactory"/>

              </entries>

              <consumer-window-size>0</consumer-window-size>                  

              <consumer-max-rate>5</consumer-max-rate>

      </connection-factory>

       

       

      Note: The Queue MtasQueue is non-persistent (could this be having any effect?).

      I need to set the consumer-window-size to 0 as messages have priority so we don't want any pre-fetching in the system.

       

      Interesting part from Producer of the ConnectionFactory

      =======================================================

       

      @Resource(mappedName = "java:/ConnectionFactory")

      private ConnectionFactory connectionFactory;  

      @Resource(mappedName = "java:jboss/jms/queue/coh/MtasQueue")

      private Queue ccmpQueue;

         

      @Produces @CcmpConnection

      public Connection createCcmpConnection() throws JMSException {

          return connectionFactory.createConnection();

      }

       

      public void closeCcmpConnection(@Disposes @CcmpConnection Connection connection) throws JMSException {

          connection.close();

      }

       

      @Produces @CcmpSession

      public Session createCcmpSession(@CcmpConnection Connection connection) throws JMSException {

          return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

      }

       

      public void closeCcmpSession(@Disposes @CcmpSession Session session) throws JMSException {

          session.close();

      }

       

      @Produces @CcmpMessageProducer

      public MessageProducer createCcmpMessageProducer(@CcmpSession Session session) throws JMSException {

          return session.createProducer(ccmpQueue);

      }

       

      public void closeCcmpMessageProducer(@Disposes @CcmpMessageProducer MessageProducer producer) throws JMSException {

          producer.close();

      }

       

      The message producer

      ====================

      Note: I wonder am I really following best practice here (I am trying to avoid the anti-pattern described in the

      HornetQ userGuide https://access.redhat.com/site/documentation/en-US/JBoss_Enterprise_Application_Platform/5/html/HornetQ_User_Guide/ch43s05.html)

      i.e. not opening and closing a Session on every send, is this the correct way to do it?

      @Stateless

      public class CCMPControlMessageSourceImpl implements CCMPControlMessageSource {

       

          @Inject @CcmpSession

          private Session session;

         

          @Inject @CcmpMessageProducer

          private MessageProducer producer;

         

          @Override  

          public void send(CCMPControlMessage cm) {

       

              try {

                  TextMessage message = session.createTextMessage();

                  message.setStringProperty(CCMPControlMessage.OPERATION, cm.getOperation());

                  message.setText(marshall(cm));

                  producer.send(message, cm.getDeliveryMode(), cm.getPriority(), cm.getTimeToLive());

              } catch (JMSException jmse) {

                  LOG.warn("Failed to send jms (jms error). Exception: {}", jmse.getMessage());

              } catch (JAXBException jaxbe) {

                  LOG.warn("Failed to send jms (marshalling error). Exception: {}", jaxbe.getMessage());

              }

          }

       

      The message consumer

      ====================

      Note: I am using a message selector could this be having an effect?

      @MessageDriven(name = "RetrieveConferenceControlMessageSink", activationConfig = {

          @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),

          @ActivationConfigProperty(propertyName = "destination", propertyValue = "java:jboss/jms/queue/coh/MtasQueue"),

          @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),

          @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "1"),

          @ActivationConfigProperty(propertyName = "messageSelector", propertyValue = ControlMessage.OPERATION + "=" + "'" + ControlMessage.CCMP_RETRIEVE + "'")

      })

      @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)

      public class RetrieveConferenceControlMessageSink extends AbstractControlMessageSink implements MessageListener {

       

          @EJB

          private MtasReader mtasReader;  

         

          @Override

          @Interceptors({PerformanceInterceptor.class})

          public void onMessage(Message msg) {

              try {

                  TextMessage textMessage = (TextMessage) msg;

                  RetrieveConference command = unmarshall(textMessage.getText(), RetrieveConference.class);

                  mtasReader.updateCache(command.getConferenceId());

       

              } catch (JAXBException e) {

                  throw new RuntimeException(e);

              } catch (JMSException e) {

              //swallow the failure

                  LOG.warn("Failed " + ControlMessage.CCMP_RETRIEVE + " because of " + e.getMessage());

              }

          }

       

      if you have read this far all i can say is thanks