Version 11

    MRG-M is the "Messaging" implementation of Red Hat's Messaging/Realtime/Grid offering.  It is a supported and enterprise implementation of AMQP (Advanced Message Queuing Protocol).  AMQP is similar in concept to JMS in that it is used for asynchronous pub/sub messaging.  However, JMS is an API standard whereas AMQP is a wire protocol standard.

     

    MRG-M provides several client libraries for interacting with an AMQP broker such as MRG-M.  The Java version of these client libraries is actually a JMS compliant library.  Because of this, it is very easy to integrate MRG-M with SOA-P 4.3.  In fact, it is the same as configuring any JMS listener/gateway, only one needs to specify properties on the JMS listener/gateway in SOA-P that point it to the MRG-M JMS implementation.

     

    This article is an attempt to show how one can configure SOA-P to have an AMQP listener/gateway by utilizing the MRG-M Java JMS client libraries.

     

    When reading through the example below, you may wonder where the term "qpid" comes from.  That is the upstream (community supported) Apache project that feeds MRG-M.

     

    Prerequistes:

     

    1.)  MRG-M v1.1 is installed, including the Java client examples:  http://www.redhat.com/docs/en-US/Red_Hat_Enterprise_MRG/1.1/pdf/Messaging_Installation_Guide.pdf

     

    2.)  SOA-P 4.3 CP02 (or higher) is installed.  This does not work with SOA-P 4.3 releases below CP02.

     

    Summary:  By default, starting MRG-M creates a "message_queue" queue on the default message exchange.  We are going to deploy an ESB service that listens to that queue and prints out any messages that get placed on it.  We are going to use the MRG-M client examples to publish messages to the "message_queue" that will get picked up by the SOA-P 4.3 ESB service.

     

    1.)  Start MRG-M

    #>/usr/sbin/qpidd

     

    2.)  Start SOA-P 4.3 using the "default" server config

    #>$SOA-P_HOME/jboss-as/bin/run.sh -c default

     

    3.)  Copy the attached AmqpEsbGateway.esb archive to "$SOA-P_HOME/jboss-as/server/default/deploy".  We will look at the contents of this archive later.  You should see something like this printed below in the SOA-P 4.3 server console log with the esb archive gets deployed:

    23:44:00,165 INFO  [JBoss4ESBDeployer] create esb service, AmqpEsbGateway.esb
    23:44:00,226 INFO  [PropertiesFileInitialContextFactory] No Provider URL specified.
    23:44:00,238 INFO  [PropertiesFileInitialContextFactory] No Provider URL specified.
    23:44:00,263 INFO  [FailoverSingleServer] No delay between connect retries, use tcp://host:port?connectdelay='value' to enable.
    23:44:00,291 INFO  [AMQConnection] Connection:amqp://guest:********@clientId/virtualHost?brokerlist='tcp://localhost:5672'
    23:44:00,460 WARN  [ClientDelegate] Ignoring the idle timeout 0 set by the connection, using the brokers max value 120
    23:44:00,553 INFO  [PropertiesFileInitialContextFactory] No Provider URL specified.
    23:44:00,703 INFO  [AMQSession] Prefetching delayed existing messages will not flow until requested via receive*() or setML().
    23:44:00,803 INFO  [Dispatcher] Dispatcher-Channel-1 started
    23:44:00,803 INFO  [Dispatcher] Dispatcher-Channel-1 created

     

    4.)  Now we need to publish some AMQP messages to the queue that the SOA-P 4.3 ESB is listening to.  For this, we will use a sample client distributed with MRG-M.  Below is what you need to do:

    #>cd /usr/share/doc/rhm-0.5/java

    #>./runSample.sh org.apache.qpid.example.jmsexample.direct.Producer

    [output removed]

     

    5.)  Looking in the SOA-P 4.3 console log should show that we have received the messages as shown below:

    23:17:53,961 INFO  [STDOUT] print:
    23:17:53,961 INFO  [STDOUT] [Message 1].
    23:17:53,972 INFO  [STDOUT] print:
    23:17:53,973 INFO  [STDOUT] [Message 2].
    23:17:53,983 INFO  [STDOUT] print:
    23:17:53,983 INFO  [STDOUT] [Message 3].
    23:17:53,988 INFO  [STDOUT] print:
    23:17:53,988 INFO  [STDOUT] [Message 4].
    23:17:53,997 INFO  [STDOUT] print:
    23:17:53,998 INFO  [STDOUT] [Message 5].
    23:17:54,013 INFO  [STDOUT] print:
    23:17:54,013 INFO  [STDOUT] [Message 6].
    23:17:54,069 INFO  [STDOUT] print:
    23:17:54,069 INFO  [STDOUT] [Message 7].
    23:17:54,070 INFO  [STDOUT] print:
    23:17:54,070 INFO  [STDOUT] [Message 8].
    23:17:54,071 INFO  [STDOUT] print:
    23:17:54,071 INFO  [STDOUT] [Message 9].
    23:17:54,133 INFO  [STDOUT] print:
    23:17:54,133 INFO  [STDOUT] [Message 10].
    23:17:54,134 INFO  [STDOUT] print:
    23:17:54,134 INFO  [STDOUT] [That's all, folks!].

     

    Congratulations, you have now deployed a SOA-P 4.3 ESB service that listens to an AMQP queue!

     

    Now, let's take a closer look at the AmqpEsbGateway.esb archive to see what is really going on.  There's really not much in the archive as shown below:

    [apestel@localhost Desktop]$ jar tvf AmqpEsbGateway.esb
          0  Fri Oct 30 23:41:50 CDT 2009 META-INF/
         71  Fri Oct 30 23:41:50 CDT 2009 META-INF/MANIFEST.MF
          0  Fri Oct 30 23:40:42 CDT 2009 lib/

    492398  Fri Oct 30 23:10:44 CDT 2009 lib/qpid-client-M4.jar
    312957  Fri Oct 30 23:14:10 CDT 2009 lib/mina-core-1.0.1.jar
       7128  Fri Oct 30 23:12:54 CDT 2009 lib/slf4j-log4j12-1.4.0.jar
      13091  Fri Oct 30 23:11:56 CDT 2009 lib/slf4j-api-1.4.0.jar
    1192579  Fri Oct 30 23:10:44 CDT 2009 lib/qpid-common-M4.jar
       1417  Fri Oct 30 23:40:42 CDT 2009 META-INF/jboss-esb.xml
    [apestel@localhost Desktop]$

     

    Basically, there are five JAR libraries that are required to use the MRG-M JMS library.  The Client library also requires log4j and commons-collections, but these are already included with SOA-P 4.3, so don't need to be included in the .esb archive.

     

    The only other interesting file in the .esb archive is jboss-esb.xml, which is shown below:

    <?xml version="1.0"?>
    <jbossesb parameterReloadSecs="5"
    xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd">
    <providers>
      <jms-provider connection-factory="qpidConnectionFactory" name="JMS">
       <property name="jndi-prefixes" value="connectionfactory.,destination."/>
       <property name="jndi-context-factory" value="org.apache.qpid.jndi.PropertiesFileInitialContextFactory"/>
       <property name="connectionfactory.qpidConnectionFactory" value="amqp://guest:guest@clientId/virtualHost?brokerlist='tcp://localhost:5672'"/>

       <property name="destination.directQueue" value="direct://amq.direct//message_queue?routingkey='routing_key'"/>
       <jms-bus busid="amqpGateway">
        <jms-message-filter dest-name="directQueue" dest-type="QUEUE"/>
       </jms-bus>
      </jms-provider>
    </providers>
    <services>
      <service category="myCategory" description="myDescription" invmScope="GLOBAL" name="AmqpService">
       <listeners>
        <jms-listener busidref="amqpGateway" is-gateway="true" name="amqpGateway"/>
       </listeners>
       <actions mep="OneWay">
        <action class="org.jboss.soa.esb.actions.SystemPrintln" name="print">
         <property name="message" value="print"/>
        </action>
       </actions>
      </service>
    </services>
    </jbossesb>

     

    There's really not a whole lot in the jboss-esb.xml file that is interesting either.  It basically looks like a normal jboss-esb.xml file that has an ESB service listening to a JMS queue and printing the contents of the JMS messages.  In fact, that is all it is.  However, note the connection-factory and properties defined on the jms-provider.  These are what basically tell the JMS listener to use the MRG-M JMS client and specify where the AMQP queue exists.

     

    That's really all there is to it.

     

    The last thing I want to show is what is required to publish a message to AMQP via this same MRG-M client libraries - as one might want to do from a SOA-P 4.3 ESB custom action for example.  Below is example code that publishes to an AMQP queue (the same one that the SOA-P ESB is listening to above):

    package test;

    import java.util.Properties;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.ExceptionListener;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.naming.Context;
    import javax.naming.InitialContext;

    public class MyProducer {
        public static void main(String[] args) throws Exception {

            new MyProducer().produce();
        }

        public void produce() throws Exception {

            String CLASS = "Producer";

            int numMessages = 10;
            short deliveryMode = DeliveryMode.NON_PERSISTENT;

            // Load JNDI properties
            Properties properties = new Properties();
            properties.setProperty("java.naming.factory.initial",
                    "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
            properties
                    .setProperty("connectionfactory.qpidConnectionfactory",
                            "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'");
            properties.setProperty("destination.directQueue",
                    "direct://amq.direct//message_queue?routingkey='routing_key'");

            // Create the initial context
            Context ctx = new InitialContext(properties);

            // look up destination
            Destination destination = (Destination) ctx.lookup("directQueue");

            // Lookup the connection factory
            ConnectionFactory conFac = (ConnectionFactory) ctx
                    .lookup("qpidConnectionfactory");
            // create the connection
            Connection connection = conFac.createConnection();

            connection.setExceptionListener(new ExceptionListener() {
                public void onException(JMSException e) {
                    e.printStackTrace();
                }
            });

            // Create a session on the connection
            // This session is a default choice of non-transacted and uses the auto
            // acknowledge feature of a session.
            System.out.println(CLASS
                    + ": Creating a non-transacted, auto-acknowledged session");
            Session session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);

            // lookup the queue
            // Queue destination = session.createQueue(_queueName);

            // Create a Message producer
            System.out.println(CLASS + ": Creating a Message Producer");
            MessageProducer messageProducer = session.createProducer(destination);

            // Create a Message
            TextMessage message;
            System.out.println(CLASS
                    + ": Creating a TestMessage to send to the destination");

            // Loop to publish the requested number of messages.
            for (int i = 1; i < numMessages + 1; i++) {
                // NOTE: We have NOT HAD TO START THE CONNECTION TO BEGIN SENDING
                // messages,
                // this is different to the consumer end as a CONSUMERS CONNECTIONS
                // MUST BE STARTED BEFORE RECEIVING.
                message = session.createTextMessage("Message " + i);
                System.out.println(CLASS + ": Sending message: " + i);
                messageProducer.send(message, deliveryMode,
                        Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
            }

            // And send a final message to indicate termination.
            message = session.createTextMessage("That's all, folks!");
            messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY,
                    Message.DEFAULT_TIME_TO_LIVE);

            // Close the connection to the broker
            System.out.println(CLASS + ": Closing connection");
            connection.close();

            // Close the JNDI reference
            System.out.println(CLASS + ": Closing JNDI context");
            ctx.close();

        }
    }

     

    Note that the code shown above is basically cut/paste from the producer example delivered with MRG-M at:

     

    /usr/share/doc/rhm-0.5/java/org/apache/qpid/example/jmsexample/direct/Producer.java