Spring and HornetQ standalone integration

    I've been testing HornetQ 2.1.2.Final for a few days now, and I needed to finally integrate it with our Java SE symmetric cluster server. We're using Spring 3 IOC  to instantiate and configure a few components, and we wanted to do the same with HornetQ, as migrating our code to a JBoss AS solution is not an option.

     

    The following configuration embeds a symmetric clustered instance of HornetQ into our server:

     

     

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
      xmlns:util="http://www.springframework.org/schema/util"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd"
      default-init-method="init" default-destroy-method="destroy">
      
      <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">    
        <description>Loads properties from multiple sources, with fallback capabilities</description>
        <property name="locations">
          <list>
            <value>file:${basePath}/etc/hornetq.properties</value>
          </list>
        </property>
        <property name="ignoreUnresolvablePlaceholders" value="true" />
        <property name="ignoreResourceNotFound" value="true" />
        <!-- Fallback to these properties if not found in locations or system 
          properties -->
        <property name="systemPropertiesMode">
          <util:constant
            static-field="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer.SYSTEM_PROPERTIES_MODE_OVERRIDE" />
        </property>
        <property name="properties">
          <props>        
            <!-- Default values: will be used if not present in properties file, or properties
                 file is not found -->
            <prop key="hornetq.runpath">${basePath}/run/hornetq</prop>
            <prop key="data.dir">${hornetq.runpath}</prop>        
          </props>
        </property>
      </bean>
        
      <!-- JNDI Server Configuration ########################################## -->
      <bean name="Naming" class="org.jnp.server.NamingBeanImpl"
            init-method="start" destroy-method="stop">
        <!-- If true, it performs a java:/comp/env context lookup... and the server
             is not ready -->
        <property name="installJavaComp" value="false" />
      </bean>
                
      <bean name="JNDIServer" class="org.jnp.server.Main"
            init-method="start" destroy-method="stop">
        <property name="namingInfo" ref="Naming" />
        <property name="port" value="${conf.jndi.port:2099}" />
        <property name="bindAddress" value="${conf.jndi.bindingAddress:0.0.0.0}" />
        <property name="rmiPort" value="${conf.jndi.rmi.port:2098}" />
        <property name="rmiBindAddress" value="${conf.jndi.rmi.bindingAddress:0.0.0.0}" />
      </bean>
      
      <bean name="MBeanServer" class="java.lang.management.ManagementFactory"
            factory-method="getPlatformMBeanServer" />
            
      <!-- END JNDI Server Configuration ###################################### -->
      
      <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
        <description>HornetQ Core Server</description>
        <constructor-arg ref="Configuration" />
        <constructor-arg ref="MBeanServer" />
        <constructor-arg ref="HornetQSecurityManager" />
      </bean>
      
      <!-- Note init/destroy methods set as start/stop here, and not in Core Server, to
           avoid the latter starting before JMS Server -->
      <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl"
            init-method="start" destroy-method="stop">
        <description>HornetQ JMS Server</description>
        <constructor-arg ref="HornetQServer" />
        <constructor-arg ref="JMSConfiguration" />
      </bean>
      
      <!-- Security manager configuration: add user/roles here -->
      <!-- HornetQSecurityManagerBean just extends org.hornetq.spi.core.security.HornetQSecurityManagerImpl
           adding a constructor to ease user/roles configuration from Spring -->
      <bean name="HornetQSecurityManager" 
            class="es.tid.planb.hornetq.security.HornetQSecurityManagerBean">
        <constructor-arg>
          <!-- User/pass map -->
          <map>
             <entry key="guest" value="guest" />
          </map>
        </constructor-arg>
         <constructor-arg>
          <!-- User/roles map -->
          <map>
             <entry key="guest">
                <!-- Roles for user -->
                <list>
                   <value>guest</value>
                </list>
             </entry>
          </map>
        </constructor-arg>
        <property name="defaultUser" value="guest" />
      </bean>  
      
      <!-- CONNECTORS : shared between HornetQ core and JMS servers -->
      <bean name="NettyInternalConnector" class="org.hornetq.api.core.TransportConfiguration">
          <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />
          <constructor-arg>
            <map>
               <entry key="host" value="${conf.hornetq.acceptor.internal.host:localhost}"/>
               <entry key="port" value="${conf.hornetq.acceptor.internal.port:5446}"/>
               <entry key="use-nio" value="${conf.hornetq.acceptor.internal.useNIO:true}"/>
            </map>
          </constructor-arg>
       </bean>
       
       <bean name="NettyExternalConnector" class="org.hornetq.api.core.TransportConfiguration">
          <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" />
          <constructor-arg>
            <map>
               <entry key="host" value="${conf.hornetq.acceptor.external.host:0.0.0.0}"/>
               <entry key="port" value="${conf.hornetq.acceptor.external.port:5445}"/>
               <entry key="use-nio" value="${conf.hornetq.acceptor.external.useNIO:true}"/>           
            </map>
          </constructor-arg>
       </bean>
       
       <bean name="NettyInVMConnector" class="org.hornetq.api.core.TransportConfiguration">
          <constructor-arg value="org.hornetq.core.remoting.impl.invm.InVMConnectorFactory" />
          <constructor-arg>
            <map>
               <entry key="server-id" value="${conf.hornetq.acceptor.invm.serverId:0}"/>
            </map>
          </constructor-arg>
       </bean>
       <!-- END CONNECTORS -->
      
      <!-- Common HornetQ configuration --> 
      <bean name="Configuration" class="org.hornetq.core.config.impl.ConfigurationImpl">        
        <property name="clustered" value="true" />
        <property name="persistenceEnabled" value="${conf.hornetq.persistenceEnabled:true}" />
        <property name="JMXManagementEnabled" value="${conf.hornetq.jmxEnabled:true}" />
        <property name="pagingDirectory" value="${hornetq.runpath}/paging" />
        <property name="bindingsDirectory" value="${hornetq.runpath}/bindings" />
        <property name="largeMessagesDirectory" value="${hornetq.runpath}/large-messages" />
        <property name="journalDirectory" value="${hornetq.runpath}/journal" />
        <property name="journalMinFiles" value="${conf.hornetq.journalMinFiles:10}" />
        <property name="journalType">
          <util:constant static-field="${conf.hornetq.journal.type:org.hornetq.core.server.JournalType.NIO}" />
        </property>
        <property name="clusterUser" value="HORNETQ-CLUSTER-USER" />
        <property name="clusterPassword" value="HORNETQ-CLUSTER-PASS" />
        <property name="threadPoolMaxSize" value="${conf.hornetq.threadPoolMaxSize:-1}" />
        
        <!-- Security settings -->
        <!-- Disable security to get a small performance boost -->
        <property name="securityEnabled" value="${conf.hornetq.securityEnabled:false}" />
        <property name="securityRoles">
          <map>
             <!-- Entry keys are the matching parameters -->
             <entry key="#">
                <set>
                   <bean class="org.hornetq.core.security.Role">
                      <!-- Name -->
                      <constructor-arg value="guest" />
                      <!-- send? -->
                      <constructor-arg value="true" />
                      <!-- consume? -->
                      <constructor-arg value="true" />
                      <!-- create durable queue? -->
                      <constructor-arg value="false" />
                      <!-- delete durable queue? -->
                      <constructor-arg value="false" />
                      <!-- create non-durable queue? -->
                      <constructor-arg value="true" />
                      <!-- delete non-durable queue? -->
                      <constructor-arg value="true" />
                      <!-- manage? -->
                      <constructor-arg value="false" />
                   </bean>
                </set>
             </entry>
          </map>
        </property>
        
        <!-- Addresses settings -->
        <property name="addressesSettings">
          <map>
             <!-- Entry keys are the matching parameters -->
             <entry key="#">
                <bean class="org.hornetq.core.settings.impl.AddressSettings">
                   <property name="deadLetterAddress" value="jms.queue.DLQ" />
                   <property name="expiryAddress" value="jms.queue.ExpiryQueue" />
                   <!-- In milliseconds -->
                   <property name="redeliveryDelay" value="${conf.hornetq.addresses.redeliveryDelay:0}" />
                   <property name="maxSizeBytes" value="${conf.hornetq.addresses.maxSizeBytes:20971520}" />
                   <property name="messageCounterHistoryDayLimit" value="${conf.hornetq.addresses.dayLimit:10}" />
                   <property name="addressFullMessagePolicy">
                      <util:constant static-field="org.hornetq.core.settings.impl.AddressFullMessagePolicy.PAGE" />
                   </property>
                </bean>
             </entry>
          </map>
        </property>
            
        <!-- Already defined as named beans (see above) -->
        <property name="connectorConfigurations">
          <map>
             <entry key="netty-internal"><ref bean="NettyInternalConnector" /></entry>
             <entry key="netty-external"><ref bean="NettyExternalConnector" /></entry>
             <entry key="netty-invm"><ref bean="NettyInVMConnector" /></entry>
          </map>
        </property>
            
        <!-- Acceptors: match ports with connectors -->
        <property name="acceptorConfigurations">
          <set>
             <bean class="org.hornetq.api.core.TransportConfiguration">
                <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory" />
                <constructor-arg>
                  <map>
                     <entry key="host" value="${conf.hornetq.acceptor.internal.host:localhost}"/>
                     <entry key="port" value="${conf.hornetq.acceptor.internal.port:5446}"/>
                     <entry key="use-nio" value="${conf.hornetq.acceptor.internal.useNIO:true}"/>
                     <entry key="direct-deliver" value="${conf.hornetq.acceptor.internal.directDeliver:false}" />
                  </map>
                </constructor-arg>
                <!-- Name -->
                <constructor-arg value="netty-internal" />
             </bean>
             <bean class="org.hornetq.api.core.TransportConfiguration">
                <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory" />
                <constructor-arg>
                  <map>
                     <entry key="host" value="${conf.hornetq.acceptor.external.host:0.0.0.0}"/>
                     <entry key="port" value="${conf.hornetq.acceptor.external.port:5445}"/>
                     <entry key="use-nio" value="${conf.hornetq.acceptor.external.useNIO:true}"/>
                     <entry key="direct-deliver" value="${conf.hornetq.acceptor.external.directDeliver:false}" />
                  </map>
                </constructor-arg>
                <!-- Name -->
                <constructor-arg value="netty-external" />
             </bean>
             <bean class="org.hornetq.api.core.TransportConfiguration">
                <constructor-arg value="org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory" />
                <constructor-arg>
                  <map>
                     <entry key="server-id" value="${conf.hornetq.acceptor.invm.serverId:0}"/>
                  </map>
                </constructor-arg>
                <!-- Name -->
                <constructor-arg value="netty-invm" />
             </bean>
          </set>
        </property>
        
        <!-- Broadcast groups -->
        <property name="broadcastGroupConfigurations">
          <list>
             <bean class="org.hornetq.core.config.BroadcastGroupConfiguration">
                <!-- Group name -->
                <constructor-arg value="${conf.hornetq.broadcastGroup.name:hornetq-bg}" />
                <!-- Local bind address -->
                <constructor-arg value="${conf.hornetq.broadcastGroup.bindingAddress:0.0.0.0}" />
                <!-- Local bind port -->
                <constructor-arg value="${conf.hornetq.broadcastGroup.bindingPort:9875}" />
                <!-- Group address -->
                <constructor-arg value="${conf.hornetq.broadcastGroup.groupAddress:231.7.7.7}" />
                <!-- Group port -->
                <constructor-arg value="${conf.hornetq.broadcastGroup.groupPort:9876}" />
                <!-- Broadcast period -->
                <constructor-arg value="${conf.hornetq.broadcastGroup.broadcastPeriod:5000}" />
                <!-- Connector infos -->
                <constructor-arg>
                  <list>
                    <!-- A list of connector-backup connector name pairs -->
                    <bean class="org.hornetq.api.core.Pair">
                      <constructor-arg value="netty-external" />
                      <constructor-arg><null/></constructor-arg>
                    </bean>
                  </list>
                </constructor-arg>
             </bean>
          </list>
        </property>
        
        <!-- Discovery groups -->
        <property name="discoveryGroupConfigurations">
          <map>
             <!-- Match map key with name constructor arg -->
             <entry key="${conf.hornetq.discoveryGroup.name}">
               <bean class="org.hornetq.core.config.DiscoveryGroupConfiguration">
                  <!-- Group name -->
                  <constructor-arg value="${conf.hornetq.discoveryGroup.name:hornetq-dg}" />
                  <!-- Local bind address -->
                  <constructor-arg value="${conf.hornetq.discoveryGroup.bindingAddress:0.0.0.0}" />
                  <!-- Group address -->
                  <constructor-arg value="${conf.hornetq.broadcastGroup.groupAddress:231.7.7.7}" />
                  <!-- Group port -->
                  <constructor-arg value="${conf.hornetq.broadcastGroup.groupPort:9876}" />
                  <!-- Refresh timeout -->
                  <constructor-arg value="${conf.hornetq.discoveryGroup.refreshTimeout:1000}" />
               </bean>
             </entry>
          </map>
        </property>
        
        <!-- Cluster connections -->
        <property name="clusterConfigurations">
          <list>      
            <bean class="org.hornetq.core.config.ClusterConnectionConfiguration">
              <constructor-arg value="${conf.hornetq.cluster.name:HORNETQ-CLUSTER}"/>
              <!-- Address : prefix of addresses being clustered-->
              <constructor-arg value="jms"/>
              <constructor-arg value="${conf.hornetq.cluster.retryInterval:500}"/>
              <constructor-arg value="${conf.hornetq.cluster.duplicateDetection:true}"/>
              <constructor-arg value="${conf.hornetq.cluster.forwardNoConsumers:false}"/>
              <constructor-arg value="${conf.hornetq.cluster.maxHops:1}"/>
              <constructor-arg value="${conf.hornetq.cluster.confirmationWindowSize:4096}" />
              <constructor-arg value="${conf.hornetq.discoveryGroup.name:hornetq-dg}"/>
            </bean>        
          </list>
        </property>
        
      </bean>
      <!-- End Core configuration -->
      
      <!-- JMS configuration -->
      <bean name="JMSConfiguration" class="org.hornetq.jms.server.config.impl.JMSConfigurationImpl">
        <!-- Connection factories -->
        <constructor-arg>
          <list>
                   
             <bean class="org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl">
                <description>Connection factory for internal use</description>
                <!-- Name -->
                <constructor-arg value="ConnectionFactory"/>
                <!-- Transport configuration -->
                <constructor-arg ref="NettyInternalConnector"/>
                <!-- Bindings (vararg, use String[]) -->             
                <constructor-arg type="java.lang.String[]">
                   <list>
                      <value>/ConnectionFactory</value>
                      <value>/connectionFactory</value>
                   </list>
                </constructor-arg>
                <!-- Confirmation windows size: how many bytes clients can send before waiting for server ACKs -->
                <property name="confirmationWindowSize" value="${conf.hornetq.client.cf.internal.confirmationWindowSize:8192}" />
                <!-- Consumer window size: how many bytes to pre-fetch from server when consuming -->
                <property name="consumerWindowSize" value="${conf.hornetq.client.cf.internal.consumerWindowSize:1048576}" />
             </bean>
             
             <!-- TODO: check failover settings -->
             <bean class="org.hornetq.jms.server.config.impl.ConnectionFactoryConfigurationImpl">
                <description>Connection factory for external clients</description>
                <constructor-arg value="ExternalConnectionFactory"/>
                <constructor-arg ref="NettyExternalConnector"/>
                <constructor-arg type="java.lang.String[]">
                   <list>
                      <value>/ExternalConnectionFactory</value>
                      <value>/externalConnectionFactory</value>
                      <value>/externalconnectionfactory</value>
                   </list>
                </constructor-arg>
                <!-- Client-side load balancing: add discovery group name -->
                <property name="discoveryGroupName" value="${conf.hornetq.discoveryGroup.name:hornetq-dg}" />
                <property name="confirmationWindowSize" value="${conf.hornetq.client.cf.external.confirmationWindowSize:8192}" />
                <property name="consumerWindowSize" value="${conf.hornetq.client.cf.external.consumerWindowSize:1048576}" />
                <!-- Retry/reconnect params -->
                <property name="reconnectAttempts" value="${conf.hornetq.client.cf.external.reconnectAttempts:-1}" />
                <property name="retryInterval" value="${conf.hornetq.client.cf.external.retryInterval:1000}" />
                <property name="maxRetryInterval" value="${conf.hornetq.client.cf.external.maxRetryInterval:20000}" />
                <property name="retryIntervalMultiplier" value="${conf.hornetq.client.cf.external.retryIntervalMultiplier:2.0}" />
                <property name="clientFailureCheckPeriod" value="${conf.hornetq.client.cf.external.clientFailureCheckPeriod:10000}" />
                <property name="failoverOnInitialConnection" value="${conf.hornetq.client.cf.external.failoverOnInitialConnection:false}" />
                <property name="failoverOnServerShutdown" value="${conf.hornetq.client.cf.external.failoverOnServerShutdown:true}" />
             </bean>
             
          </list>
        </constructor-arg>
        <!-- Queue configurations -->
        <constructor-arg>
          <list>
            <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
              <!-- Name -->
              <constructor-arg value="TaskManagerQueue" />
              <!-- Selector, if any -->
              <constructor-arg><null/></constructor-arg>
              <!-- Durable? -->
              <constructor-arg value="true" />
              <!-- Bindings (varargs) -->
              <constructor-arg value="/queue/TaskManagerQueue" />
            </bean>
            <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
              <constructor-arg value="AsyncWorkerQueue" />
              <constructor-arg><null/></constructor-arg>
              <constructor-arg value="true" />
              <constructor-arg value="/queue/AsyncWorkerQueue" />
            </bean>
            <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
              <constructor-arg value="taskIO" />
              <constructor-arg><null/></constructor-arg>
              <constructor-arg value="true" />
              <constructor-arg type="java.lang.String[]">
                <list>
                   <value>/queue/taskIO</value>
                   <value>taskIO</value>
                </list>
              </constructor-arg>
            </bean>
            <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
              <constructor-arg value="taskrecv" />
              <constructor-arg><null/></constructor-arg>
              <constructor-arg value="true" />
              <constructor-arg type="java.lang.String[]">
                <list>
                   <value>/queue/taskrecv</value>
                   <value>taskrecv</value>
                </list>
              </constructor-arg>
            </bean>
            <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
              <constructor-arg value="EventDispatcherQueue" />
              <constructor-arg><null/></constructor-arg>
              <constructor-arg value="true" />
              <constructor-arg value="/queue/EventDispatcherQueue" />
            </bean>
            <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
              <constructor-arg value="EventGateQueue" />
              <constructor-arg><null/></constructor-arg>
              <constructor-arg value="true" />
              <constructor-arg value="/queue/EventGateQueue" />
            </bean>
            <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
              <constructor-arg value="HTTPGateQueue" />
              <constructor-arg><null/></constructor-arg>
              <constructor-arg value="true" />
              <constructor-arg value="/queue/HTTPGateQueue" />
            </bean>
            <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
              <constructor-arg value="LoopbackGateQueue" />
              <constructor-arg><null/></constructor-arg>
              <constructor-arg value="true" />
              <constructor-arg value="/queue/LoopbackGateQueue" />
            </bean>
            <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
              <constructor-arg value="ExternalJMSGateQueue" />
              <constructor-arg><null/></constructor-arg>
              <constructor-arg value="true" />
              <constructor-arg value="/queue/ExternalJMSGateQueue" />
            </bean>
            <bean class="org.hornetq.jms.server.config.impl.JMSQueueConfigurationImpl">
              <constructor-arg value="SpringGateQueue" />
              <constructor-arg><null/></constructor-arg>
              <constructor-arg value="true" />
              <constructor-arg value="/queue/SpringGateQueue" />
            </bean>
          </list>
        </constructor-arg>
        <!-- Topic configurations -->
        <constructor-arg>
          <list>
          </list>
        </constructor-arg>
        
        <property name="context">
          <bean class="javax.naming.InitialContext" />
        </property>
      </bean>  
    
    </beans>

     

    A few notes about this configuration:

     

    • The JBoss JNP JNDI is also instantiated and configured. Our code relies on JNDI to perform the lookup of JMS connection factories and destinations.
    • I don't like the Spring p-namespace for property sets, but it can save a lot of space. Check it out here
    • 3 connectors/acceptors pairs are configured: one Netty-type for internal (within JVM) use, one Netty-type for external clients, and one InVM (configured, but not currently used)
    • No backup server or backup connectors are (currently) configured
    • No topics configured (should be pretty similar to queue configuration, but using TopicConfigurationImpl)
    • There are no proper set/get methods for user and roles in org.hornetq.spi.core.security.HornetQSecurityManagerImpl, so we created a simple extension of it to ease configuration from Spring:

     

     

    package es.tid.planb.hornetq.security;
    
    import java.util.List;
    import java.util.Map;
    
    import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
    
    /**
     * Bean adaptor for {@link HornetQSecurityManagerImpl}
     * 
     * @author Eduardo Corral
     * @since 1.2
     */
    public class HornetQSecurityManagerBean extends HornetQSecurityManagerImpl
    {
    
        public HornetQSecurityManagerBean() {
            super();
        }
        
        /**
         * @param users Map of user and passwords
         * @param roles Map of user and list of roles
         */
        public HornetQSecurityManagerBean(Map<String, String> users, Map<String, List<String>> roles) {
            super();
            if (users != null && !users.isEmpty()) {
                for (Map.Entry<String, String> userEntry : users.entrySet()) {
                    addUser(userEntry.getKey(), userEntry.getValue());
                    if (roles.containsKey(userEntry.getKey())) {
                        for (String role : roles.get(userEntry.getKey())) {
                            addRole(userEntry.getKey(), role);
                        }
                    }
                }
            }
        }
    }