5 Replies Latest reply on Feb 10, 2009 9:28 AM by Jeff Bride

    JMS / AMQP  gateway for JBoss ESB

    Jeff Bride Novice

      Hi,
      Customer needs enterprise wide business messaging inter-operability and has selected AMQP as the path forward rather than other options such as JMS bridge or any possibilities that WS-* might offer. Subsequently, we have a need for a JMS / AMQP gateway and notifier for the JBoss SOA-P / ESB . Using the Java client bindings from the QPid project and JBESB_4_4_GA_CP2_CR4 ..... I do have a prototype AMQP gateway working. In a nutshell, the data flow in this scenario is as follows: remote message providers send AMQP messages to an exchange hosted by the C++ broker of QPid. The broker routes those messages as appropriate to queues .... and the JMS/QPid gateway (subscribed to that queue) picks up the JMS message and forwards it to a corresponding JMS listener on the ESB. Note: we continue to use JBoss Messaging as the default JMS broker for the JBoss ESB . Only this AMQP gateway will subscribe to QPid ....... all other ESB services will communicate with each-other via inVM or JBoss Messaging.

      My purpose for this post is to discuss tweaks to some of the ESB code I had to make to get the gateway working with the QPid Java client bindings. It's possible that changes actually belong in the QPid Java client code-base and not the ESB. I'm hoping that experts on this forum might offer their opinion. ..... thank you!

      The following is a snippet of the config for my QPid provider :

      <jms-provider name="JMS" connection-factory="connectionfactory.qpidConnectionFactory">
       <property name="jndi-prefixes" value="connectionfactory.,destination." />
       <property name="java.naming.factory.initial" value="org.apache.qpid.jndi.PropertiesFileInitialContextFactory"/>
       <property name="connectionfactory.qpidConnectionFactory" value="amqp://guest:guest@clientId/virtualHost?brokerlist='tcp://ratwater:5672'"/>
       <property name="destination.Alpha_Co" value="direct://usmc.tracks/Alpha_Co_queue?routingkey='Alpha_Co_key'"/>
       <jms-bus busid="quickstartGwChannel">
       <jms-message-filter dest-type="QUEUE" dest-name="destination.Alpha_Co" />
       </jms-bus>
      </jms-provider>


      The problem I encountered is that the QPid Java client binding snips off prefixes to its InitialContext keys when creating an InitialContext (ie: QPid jndiContext ends up with a key of 'qpidConnectionFactory' rather than connectionfactory.qpidConnectionFactory ) . Subsequently, when executing the lookup on the QPid InitialContext, the ESB uses the full property name from the jboss-esb.xml config (i.e.: connectionfactory.qpidConnectionFactory ) ..... as a result, a NamingException is always thrown and the ESB service bombs. Below, is a code snippet from org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java that highlights this 'snipping' that QPid is doing :

      protected void createConnectionFactories(Map data, Hashtable environment)
       {
       for (Iterator iter = environment.entrySet().iterator(); iter.hasNext();)
       {
       Map.Entry entry = (Map.Entry) iter.next();
       String key = entry.getKey().toString();
       if (key.startsWith(CONNECTION_FACTORY_PREFIX))
       {
       String jndiName = key.substring(CONNECTION_FACTORY_PREFIX.length());
       ConnectionFactory cf = createFactory(entry.getValue().toString());
       if (cf != null)
       {
       _logger.info("createConnectionFactories() connectionFactory created at jndiName = "+jndiName+" w/ cFactory = "+entry.getValue().toString());
       data.put(jndiName, cf);
       } else {
       _logger.warn("createConnectionFactories() connectionFactory not created for : "+entry.getValue().toString());
       }
      
       }
       }
       }


      Thus, to accommodate for the 'snipping' I had to make changes to the following files :

      [jbride@ratwater rosetta]$ svn status
      M src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java
      M src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java
      M src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java



      Before, I get into details about the ESB code changes .... any thoughts as to who should accommodate who ?? Do other JMS client libraries snip off prefixes like this ? I'll be asking the QPid folks the same questions.

      thank you !

      Jeffrey Bride

        • 1. Re: JMS / AMQP  gateway for JBoss ESB
          Jeff Bride Novice

          As a follow-up, here are the diffs of source changes to JBESB_4_4_GA_CP2_CR4 :

          --- src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java (revision 25144)
          +++ src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPoolContainer.java (working copy)
          @@ -160,7 +160,8 @@
           */
          static Properties getJndiEnvironment(Map<String,String> poolKey) {
           Properties environment = new Properties();
          - final String[] jndiPrefixes = JMSEpr.getJndiPrefixes(environment.getProperty(JMSEpr.JNDI_PREFIXES)) ;
          + final String[] jndiPrefixes = JMSEpr.getJndiPrefixes((String)poolKey.get(JMSEpr.JNDI_PREFIXES)) ;
           for (String key: poolKey.keySet()) {
           for(String jndiPrefix: jndiPrefixes) {
           if (key.startsWith(jndiPrefix)) {


          --- src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java (revision 25144)
          +++ src/org/jboss/internal/soa/esb/rosetta/pooling/JmsConnectionPool.java (working copy)
          @@ -504,10 +504,27 @@
           {
           factoryConnection = jndiContext.lookup(connectionFactoryString);
           } catch (NamingException ne) {
          - logger.info("Received NamingException, refreshing context.");
          - jndiContext = NamingContextPool.replaceNamingContext(jndiContext, JmsConnectionPoolContainer.getJndiEnvironment(poolKey));
          - factoryConnection = jndiContext.lookup(connectionFactoryString);
          - }
          + try {
          + logger.info("Received NamingException, refreshing context.");
          + jndiContext = NamingContextPool.replaceNamingContext(jndiContext, JmsConnectionPoolContainer.getJndiEnvironment(poolKey));
          + factoryConnection = jndiContext.lookup(connectionFactoryString);
          + } catch (NamingException nne) {
          + /* Jeffrey Bride : 6 Feb 2009
          + * org.apache.qpid.jndi.PropertiesFileInitialContextFactory clips off 'connectionfactory' portion of jndiName
          + * determine whether connectionFactoryString includes jndi-prefix: if so, clip the prefix off as well & try again
          + */
          + final String[] jndiPrefixes = JMSEpr.getJndiPrefixes((String)poolKey.get(JMSEpr.JNDI_PREFIXES)) ;
          + for(String jndiPrefix: jndiPrefixes) {
          + if (connectionFactoryString.startsWith(jndiPrefix)) {
          + connectionFactoryString = connectionFactoryString.substring(jndiPrefix.length());
          + logger.info("attempt to lookup connectionFactory w/ jndiName = "+connectionFactoryString);
          + factoryConnection = jndiContext.lookup(connectionFactoryString);
          + break;
          + }
          + }
          +
          + }
          + }


          --- src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java (revision 25144)
          +++ src/org/jboss/soa/esb/listeners/gateway/JmsGatewayListener.java (working copy)
          @@ -420,19 +420,32 @@
           try {
           oJndiCtx = NamingContextPool.replaceNamingContext(oJndiCtx, environment);
           jmsDestination = (Destination) oJndiCtx.lookup(jmsDestinationName);
          - }
          - catch (NamingException ne) {
          - if(JMSEpr.QUEUE_TYPE.equals(destType)) {
          - jmsDestination = jmsSession.createQueue(jmsDestinationName);
          - } else {
          - jmsDestination = jmsSession.createTopic(jmsDestinationName);
          + } catch (NamingException ne) {
          + /* Jeffrey Bride : 6 Feb 2009
          + * org.apache.qpid.jndi.PropertiesFileInitialContextFactory clips off 'destination' portion of jndiName
          + * determine whether jmsDestinationName includes jndi-prefix ... if so, clip the prefix off as well & try again
          + */
          + try {
          + for(String jndiPrefix: jndiPrefixes) {
          + if (jmsDestinationName.startsWith(jndiPrefix)) {
          + jmsDestinationName = jmsDestinationName.substring(jndiPrefix.length());
          + _logger.info("attempt to lookup Destination w/ jndiName = "+jmsDestinationName);
          + jmsDestination = (Destination)oJndiCtx.lookup(jmsDestinationName);
          + break;
          + }
          + }
          + } catch (NamingException nne) {
          + if(JMSEpr.QUEUE_TYPE.equals(destType)) {
          + jmsDestination = jmsSession.createQueue(jmsDestinationName);
          + } else {
          + jmsDestination = jmsSession.createTopic(jmsDestinationName);
          + }


          • 2. Re: JMS / AMQP  gateway for JBoss ESB
            Jeff Bride Novice

            For those that may be interested, Robert from the QPid team clarified what and why the jms-provider config should be. In my case, it looks as follows :

            <jms-provider name="JMS" connection-factory="qpidConnectionFactory">
             <property name="jndi-prefixes" value="connectionfactory.,destination." />
             <property name="java.naming.factory.initial" value="org.apache.qpid.jndi.PropertiesFileInitialContextFactory"/>
             <property name="connectionfactory.qpidConnectionFactory" value="amqp://guest:guest@clientId/virtualHost?brokerlist='tcp://ratwater:5672'"/>
             <property name="destination.Alpha_Co" value="direct://usmc.tracks/Alpha_Co_queue?routingkey='Alpha_Co_key'"/>
             <jms-bus busid="quickstartGwChannel">
             <jms-message-filter dest-type="QUEUE" dest-name="Alpha_Co" />
             </jms-bus>
             </jms-provider>


            This configuration works with the ESB with the exception of the one line code change in JmsConnectionPoolContainer.java .... as highlighted in yesterdays posting.

            Robert's explanation can be found here under the subject 'Java client bindings on JBoss ESB' : http://mail-archives.apache.org/mod_mbox/qpid-users/200902.mbox/browser


            jeff

            • 3. Re: JMS / AMQP  gateway for JBoss ESB
              Arnaud Simon Newbie


              Another solution would be for you to use the property java.naming.provider./url /to specify the path to a qpid property file.

              <jms-provider name="JMS"
              connection-factory="connectionfactory.qpidConnectionFactory">

              <jms-bus busid="quickstartGwChannel">
              <jms-message-filter dest-type="QUEUE" dest-name="Alpha_Co" />
              </jms-bus>
              </jms-provider>

              the file /path/qpid.jndi would look like:

              ava.naming.factory.initial=org.apache.qpid.jndi.PropertiesFileInitialContextFactory
              connectionfactory.qpidConnectionFactory=amqp://guest:guest@clientId/virtualHost?brokerlist='tcp://ratwater:5672'
              destination.Alpha_Co=direct://usmc.tracks/Alpha_Co_queue?routingkey='Alpha_Co_key'

              This should work with the existing ESB code as the connection factory is defined in qpid.jndi and the name of it through your ESB
              xml file.

              Arnaud




              • 4. Re: JMS / AMQP  gateway for JBoss ESB
                Jeff Bride Novice

                Hi Arnaud,
                great suggestion .... thank you !
                Do you think the issue found in JmsConnectionPoolContainer.java could be addressed please ?

                For those that may be interested, w/ respect to Arnaud's suggestion, the following jboss-esb.xml config snippet :

                <jms-provider name="JMS" connection-factory="qpidConnectionFactory">
                 <property name="jndi-prefixes" value="connectionfactory.,destination." />
                 <property name="java.naming.factory.initial" value="org.apache.qpid.jndi.PropertiesFileInitialContextFactory" />
                 <property name="java.naming.provider.url" value="/amqp.properties" />
                 <jms-bus busid="quickstartGwChannel">
                 <jms-message-filter dest-type="QUEUE" dest-name="Alpha_Co" />
                 </jms-bus>
                 </jms-provider>



                ALMOST works when the following amqp.properties is dropped in the root of an *.esb archive :
                connectionfactory.qpidConnectionFactory=amqp://guest:guest@clientId/virtualHost?brokerlist='tcp://ratwater:5672'
                destination.Alpha_Co = direct://usmc.tracks/Alpha_Co_queue?routingkey='Alpha_Co_key'
                destination.Bravo_Co = direct://usmc.tracks/Bravo_Co_queue?routingkey='Bravo_Co_key'
                destination.Charlie_Co = direct://usmc.tracks/Charlie_Co_queue?routingkey='Charlie_Co_key'


                (Current problem is that QPId doesn't look in the JBoss classloader for the amqp.properties file )

                jeff