Version 2

    This guide is based on Integration of JBoss AS 7 with ActiveMQ article, settings for resource adapter, connection factories and queues can be obtained there.

     

    For succesful integration we need:

    • JBoss AS 7.2 nightly (because of the bug in last released version 7.1.1, which prevents proper interoperability)
      • Server needs to be configured by the mentioned article
    • ActiveMQ broker running (externaly or internaly)
    • SwitchYard 0.5 installed in the JBoss AS 7
      • good choice is to use switchyard-installer, which allows you to install switchyard in any JBoss AS 7 server
    • SwitchYard quickstarts, which will be created in installer's directory after installation into JBossAS7
      • There is an issue in cooperation of SwitchYard 0.5 and JBoss AS 7.2 now, switchyard installer rewrites the module org.jboss.weld.core with older version of jars and especially module.xml file, which is corupted. The solution is to return back the former jars and module.xml from original JBoss AS 7.2

     

    How to configure it

    We tried the fuctionality with SwitchYard JCA inbound and outbound, for this purpose is great example in quickstarts/jca-outbound-hornetq

    • copy the directory quickstarts/jca-outbound-hornetq  to quickstarts/jca-outbound-activemq
    • edit file quickstarts/jca-outbound-activemq/pom.xml and change these two lines
    <artifactId>switchyard-quickstart-jca-outbound-hornetq</artifactId>
    <name>Quickstart : JCA Outbound Binding for HornetQ</name>
    

    to

    <artifactId>switchyard-quickstart-jca-outbound-activemq</artifactId>

    <name>Quickstart : JCA Outbound Binding for ActiveMQ</name>

     

    • modify the quickstarts/jca-outbound-activemq/src/main/resources/META-INF/switchyard.xml file, which describes the service to look like below, modified values are in bold:

     

    <?xml version="1.0" encoding="UTF-8"?>

    <switchyard xmlns="urn:switchyard-config:switchyard:1.0"

                name="switchyard-quickstart-jca-outbound-hornetq"

                xmlns:jca="urn:switchyard-component-jca:config:1.0"

                xmlns:sca="http://docs.oasis-open.org/ns/opencsa/sca/200912"

                xmlns:bean="urn:switchyard-component-bean:config:1.0">

     

        <sca:composite name="jca-outbound-hornetq" targetNamespace="urn:switchyard-quickstart:jca-outbound-hornetq:0.1.0">

     

            <sca:service name="OrderService" promote="OrderComponent/OrderService">

                <binding.jca xmlns="urn:switchyard-component-jca:config:1.0">

                   <inboundConnection>

                       <resourceAdapter name="activemq-ra.rar">

                       </resourceAdapter>

                       <activationSpec>

                           <property name="destinationType" value="javax.jms.Queue"/>

                           <property name="destination" value="queue.queue_in"/>

                       </activationSpec>

                   </inboundConnection>

                   <inboundInteraction>

                       <listener>javax.jms.MessageListener</listener>

                       <inboundOperation name="process" selectedOperation="process"/>

                       <endpoint type="org.switchyard.component.jca.endpoint.JMSEndpoint"/>

                       <transacted>true</transacted>

                   </inboundInteraction>

                </binding.jca>

            </sca:service>

     

            <sca:reference name="ShippingReference" promote="OrderComponent/ShippingReference" multiplicity="1..1">

                <binding.jca xmlns="urn:switchyard-component-jca:config:1.0">

                   <outboundConnection>

                       <resourceAdapter name="activemq-ra.rar"/>

                       <connection jndiName="java:/activemq/ConnectionFactory"/>

                   </outboundConnection>

                   <outboundInteraction>

                       <processor type="org.switchyard.component.jca.processor.JMSProcessor">

                           <property name="destination" value="java:/activemq/queue_out"/>

                       </processor>

                   </outboundInteraction>

                </binding.jca>

            </sca:reference>

            <sca:reference name="FillingStockReference" promote="OrderComponent/FillingStockReference" multiplicity="1..1">

                <binding.jca xmlns="urn:switchyard-component-jca:config:1.0">

                   <outboundConnection>

                       <resourceAdapter name="activemq-ra.rar"/>

                       <connection jndiName="java:/activemq/ConnectionFactory"/>

                   </outboundConnection>

                   <outboundInteraction>

                       <processor type="org.switchyard.component.jca.processor.JMSProcessor">

                           <property name="destination" value="java:/activemq/queue_out"/>

                       </processor>

                   </outboundInteraction>

                </binding.jca>

            </sca:reference>

     

            <sca:component name="OrderComponent">

                <bean:implementation.bean class="org.switchyard.quickstarts.jca.outbound.OrderServiceBean"/>

                <sca:service name="OrderService">

                    <sca:interface.java interface="org.switchyard.quickstarts.jca.outbound.OrderService"/>

                </sca:service>

                <sca:reference name="ShippingReference">

                    <sca:interface.java interface="org.switchyard.quickstarts.jca.outbound.OrderService"/>

                </sca:reference>

                <sca:reference name="FillingStockReference">

                    <sca:interface.java interface="org.switchyard.quickstarts.jca.outbound.OrderService"/>

                </sca:reference>

            </sca:component>

        </sca:composite>

    </switchyard>

     

    • modify service implementation class quickstarts/jca-outbound-activemq/src/main/java/org/switchyard/quickstarts/jca/outbound/OrderServiceBean.java, add the following two lines on the begining of the method process() - it let us know, that message was received into service and changes it, so we know, that to output queue we receive changed message

     

    System.out.println("Message received into the service: " + order);
    order = order + " # CHANGED";
    
    • in quickstarts/jca-outbound-activemq directory, run mvn clean install -DskipTests
    • start the fully configured server connected to ActiveMQ broker (according to Integration of JBoss AS 7 with ActiveMQ)
    • copy quickstarts/jca-outbound-activemq/target/switchyard-quickstart-jca-outbound-activemq.jar to JBoss AS 7 standalone/deployments directory, in few seconds it should be successfuly deployed.

     

    How to test that it is working

    Following simple class sends one message to queue queue.queue_in and waits to receive response from queue.queue_out. activemq-core-5.6.0.jar, geronimo-jms_1.1_spec-1.1.1.jar and geronimo-j2ee-management_1.1_spec-1.0.1.jar libraries have to be on the classpath to successfuly run the class.

     

     

    package simple;
    
    import javax.jms.BytesMessage;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQQueue;
    
    public class Client {
    
        public static void main(String[] args) {
            Connection con = null;
            try {
                ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
                con = cf.createConnection();
                con.start();
    
                Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);            
                Destination destIn = (Destination) new ActiveMQQueue("queue.queue_in");
    
                MessageProducer producer = session.createProducer(destIn);
                long time = System.currentTimeMillis();            
    
                BytesMessage message = session.createBytesMessage();
    
                message.writeBytes("BREAD".getBytes());
                producer.send(message);
                System.out.println("Message sent");
    
                System.out.println("It took " + (System.currentTimeMillis() - time) + " ms");
    
    
    
                Destination destOut = (Destination) new ActiveMQQueue("queue.queue_out");
                MessageConsumer consumer = session.createConsumer(destOut);
                BytesMessage receivedMessage = (BytesMessage) consumer.receive(20000);
    
                if (receivedMessage == null) {
                    System.out.println("Message was not received in 20 seconds");
                } else {
    
                    byte[] bytes = new byte[100];
                    receivedMessage.readBytes(bytes);
                    System.out.println("Received message: " + new String(bytes));
                }
    
                session.close();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (con != null) {
                    try {
                        con.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }     
            }
        }
    
    }
    

     

    FuseMQ integration with SwitchYard

    For FuseMQ, everything works in the same way, only thing, that has to be changed is the resource adapter (see Integration of JBoss AS 7 with ActiveMQ)