Version 3

    Summary

     

    Apache ActiveMQ is a JMS 1.1-compliant, open-source messaging service. The JBoss SOA Platform (SOA-P) is an enterprise service bus (ESB) packaged with business rules and business process engines, built on the JBoss Enterprise Application Platform (EAP) 4.3. While SOA-P comes complete with JBoss Messaging (as part of the EAP), it is not restricted to listening to only JBoss Messaging JMS services. This article describes the process for creating a prototype ActiveMQ JMS queue gateway/listener for JBoss SOA-P 4.3.

     

    Prerequisites

     

    1) You have downloaded and installed ActiveMQ. The latest version may be found at      http://activemq.apache.org/. ActiveMQ 5.3.0 was used for this prototype.

     

    2) You have created an ActiveMQ queue destination called MYQUEUE. For instructions on creating destinations in ActiveMQ, see http://activemq.apache.org/configure-startup-destinations.html.

     

    3) You have downloaded and installed JBoss SOA-P 4.3. This prototype was tested on SOA-P 4.3CP2.

     

    4) This prototype was developed on Fedora 11, and may require slight modification to run on Windows or non-Linux/UNIX systems. (This is due to a file gateway configuration that is listening for the existence of a file on the /tmp directory.)

     

    Executing the Prototype

     

    1) Start up ActiveMQ:

     

    {code}${amq_dir}/bin/activemq{code}

     

     

    2) Start up SOA-P:

     

    {code}${soa-p_dir}/jboss-as/bin/run.sh -c default{code}

     

    3) Unzip the attached ActiveMQEsbGateway.esb archive and copy to the ${soa-p_dir}/jboss-as/server/default/deploy directory. You should see the following console output:

     

    {code}15:53:50,596 INFO  [JBoss4ESBDeployer] create esb service, ActiveMQDemo.esb
    15:53:50,751 INFO  [AbstractFileGateway] No value specified for: max-millis-for-response -  This will be an 'inbound-only' gateway
    15:53:51,152 INFO  [FailoverTransport] Successfully connected to tcp://localhost:61616
    15:53:51,273 INFO  [QuartzScheduler] Quartz Scheduler v.1.5.2 created.
    15:53:51,273 INFO  [RAMJobStore] RAMJobStore initialized.
    15:53:51,273 INFO  [StdSchedulerFactory] Quartz scheduler 'ESBScheduler:ActiveMQDemo.esb' initialized from an externally provided properties instance.
    15:53:51,273 INFO  [StdSchedulerFactory] Quartz scheduler version: 1.5.2
    15:53:51,273 INFO  [QuartzScheduler] Scheduler ESBScheduler:ActiveMQDemo.esb_$_NON_CLUSTERED started.{code}

     

    This archive inclues a JMS gateway configured for ActiveMQ, as well as a file gateway that will detect the existence of a file suffixed with the string "jmsdata". The deployed prototype configures the ESB to listen for the existence of a file named *jmsdata. Once such a file is detected, the ESB consumes the file contents, and then produces a JMS message comprised of the file contents and sends it to the queue MYQUEUE. The ESB is also configured to listen to messages queued to MYQUEUE. Once a queued message is detected, it consumes the message via the ActiveMQ JMS gateway and prints out the message contents.

     

    4) Using your editor of choice, create a simple text file named jmsdata and add the line "Hello world" (or whatever you want to write) to the file.

     

    5) Copy the file jmsfile to the /tmp directory.

     

    6) You should now see the following output to the console:

     

    {code}16:14:43,064 INFO  [STDOUT] Captured message from the Println action: 
    16:14:43,065 INFO  [STDOUT] [Hello world
    ].
    16:14:43,065 INFO  [STDOUT] Processing message: Hello world
    16:14:43,082 INFO  [FailoverTransport] Successfully connected to tcp://localhost:61616
    16:14:43,114 INFO  [STDOUT] Acknowledge receipt of ActiveMQ consumed message.:
    16:14:43,115 INFO  [STDOUT] [JMS Messsage: Hello world{code}

     

    Note the final two lines which acknowledge receipt of the ActiveMQ message and the JMS message output.

     

    Congratulations! You have successfully deployed an ActiveMQ ESB gateway to allow SOA-P to consume JMS messages via ActiveMQ.

    Artifacts

     

    1) This prototype contains one Java file, an ESB action named ActiveMQMessageAction.java:

     

    package com.jboss.demo;


    import org.jboss.soa.esb.actions.AbstractActionLifecycle;
    import org.jboss.soa.esb.helpers.ConfigTree;
    import org.jboss.soa.esb.message.Message;

    import javax.jms.JMSException;
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueSender;
    import javax.jms.QueueSession;
    import javax.jms.TextMessage;

    import javax.naming.Context;
    import javax.naming.InitialContext;
    import javax.naming.NamingException;

    /**
    * Action to demonstrate the delivery of an ActiveMQ JMS message from a string.
    */

    public class ActiveMQMessageAction extends AbstractActionLifecycle {
       
        public ActiveMQMessageAction(ConfigTree config) {}
       
        public Message process(Message message) {
            String messageStr = new String((byte[]) message.getBody().get());
            System.out.println("Processing message: " + messageStr);
            try {
                processJMSMessage(messageStr);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return message;
        }
       
        /**
         * Process the message.
         * @param messageStr The message string.
         * @throws Exception
         */
        private void processJMSMessage(String messageStr) throws JMSException, NamingException {
            Context context = getContext();
           
            QueueConnectionFactory factory =
                (QueueConnectionFactory)context.lookup("ConnectionFactory");




            QueueConnection conn = factory.createQueueConnection();
            conn.start();
           
            Queue queue = (Queue)context.lookup("MYQUEUE");
            QueueSession session =
                conn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);



            QueueSender sender = session.createSender(queue);

           
            TextMessage message = session.createTextMessage("JMS Messsage: " + messageStr);
            sender.send(message);
            sender.close();
            conn.close();
        }
       
        /**
         * Returns the ActiveMQ context.
         * @return
         * @throws NamingException
         */
        private Context getContext() throws NamingException {
            Context context = null;
            try {
                java.util.Properties properties = new java.util.Properties();
                properties.setProperty("java.naming.factory.initial",
                        "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
                properties.setProperty("queue.MYQUEUE", "MYQUEUE");
                context = new InitialContext(properties);
            } catch (NamingException e) {
                e.printStackTrace();
            }       
            return context;
        }
    }



    2) Below is the jboss-esb.xml file:

     

    {code:xml}<?xml version="1.0"?>
    <jbossesb parameterReloadSecs="5"
    xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd">
    <providers>
      <fs-provider name="FilePollerProvider">
       <fs-bus busid="FilePollerChannel">
        <fs-message-filter directory="/tmp" input-suffix="jmsdata"/>
       </fs-bus>
      </fs-provider>
      <jms-provider connection-factory="ConnectionFactory" name="ActiveMQProvider">
       <property name="jndi-context-factory" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
       <property name="queue.MYQUEUE" value="MYQUEUE"/>
       <jms-bus busid="ActiveMQChannel">
        <jms-message-filter dest-name="MYQUEUE" dest-type="QUEUE"/>
       </jms-bus>
      </jms-provider>
    </providers>
    <services>
      <service category="File poller service"
       description="Capture data from a .jmsdata file" invmScope="GLOBAL" name="FilePollerService">
       <listeners>
        <fs-listener busidref="FilePollerChannel" is-gateway="true" name="FileServiceListener"/>
       </listeners>
       <actions>
        <action name="PrintlnAction">
         <property name="message" value="Captured message from the Println action"/>
        </action>
        <action name="ActiveMQMessageAction"/>
       </actions>
      </service>
      <service category="Service ActiveMQ Messages"
       description="Service ActiveMQ Messages" invmScope="GLOBAL" name="ActiveMQService">
       <listeners>
        <jms-listener busidref="ActiveMQChannel" is-gateway="true" name="ActiveMQListener">
         <jms-message-filter dest-name="MYQUEUE" dest-type="QUEUE"/>
        </jms-listener>
       </listeners>
       <actions>
        <action name="ActiveMQAck">
         <property name="message" value="Acknowledge receipt of ActiveMQ consumed message."/>
        </action>
       </actions>
      </service>
    </services>
    </jbossesb>{code}