Summary
Apache ActiveMQ is a JMS 1.1-compliant, open-source messaging service. The JBoss SOA Platform (SOA-P) is an enterprise service bus (ESB) packaged with business rules and business process engines, built on the JBoss Enterprise Application Platform (EAP) 4.3. While SOA-P comes complete with JBoss Messaging (as part of the EAP), it is not restricted to listening to only JBoss Messaging JMS services. This article describes the process for creating a prototype ActiveMQ JMS queue gateway/listener for JBoss SOA-P 4.3.
Prerequisites
1) You have downloaded and installed ActiveMQ. The latest version may be found at http://activemq.apache.org/. ActiveMQ 5.3.0 was used for this prototype.
2) You have created an ActiveMQ queue destination called MYQUEUE. For instructions on creating destinations in ActiveMQ, see http://activemq.apache.org/configure-startup-destinations.html.
3) You have downloaded and installed JBoss SOA-P 4.3. This prototype was tested on SOA-P 4.3CP2.
4) This prototype was developed on Fedora 11, and may require slight modification to run on Windows or non-Linux/UNIX systems. (This is due to a file gateway configuration that is listening for the existence of a file on the /tmp directory.)
Executing the Prototype
1) Start up ActiveMQ:
{code}${amq_dir}/bin/activemq{code}
2) Start up SOA-P:
{code}${soa-p_dir}/jboss-as/bin/run.sh -c default{code}
3) Unzip the attached ActiveMQEsbGateway.esb archive and copy to the ${soa-p_dir}/jboss-as/server/default/deploy directory. You should see the following console output:
{code}15:53:50,596 INFO [JBoss4ESBDeployer] create esb service, ActiveMQDemo.esb
15:53:50,751 INFO [AbstractFileGateway] No value specified for: max-millis-for-response - This will be an 'inbound-only' gateway
15:53:51,152 INFO [FailoverTransport] Successfully connected to tcp://localhost:61616
15:53:51,273 INFO [QuartzScheduler] Quartz Scheduler v.1.5.2 created.
15:53:51,273 INFO [RAMJobStore] RAMJobStore initialized.
15:53:51,273 INFO [StdSchedulerFactory] Quartz scheduler 'ESBScheduler:ActiveMQDemo.esb' initialized from an externally provided properties instance.
15:53:51,273 INFO [StdSchedulerFactory] Quartz scheduler version: 1.5.2
15:53:51,273 INFO [QuartzScheduler] Scheduler ESBScheduler:ActiveMQDemo.esb_$_NON_CLUSTERED started.{code}
This archive inclues a JMS gateway configured for ActiveMQ, as well as a file gateway that will detect the existence of a file suffixed with the string "jmsdata". The deployed prototype configures the ESB to listen for the existence of a file named *jmsdata. Once such a file is detected, the ESB consumes the file contents, and then produces a JMS message comprised of the file contents and sends it to the queue MYQUEUE. The ESB is also configured to listen to messages queued to MYQUEUE. Once a queued message is detected, it consumes the message via the ActiveMQ JMS gateway and prints out the message contents.
4) Using your editor of choice, create a simple text file named jmsdata and add the line "Hello world" (or whatever you want to write) to the file.
5) Copy the file jmsfile to the /tmp directory.
6) You should now see the following output to the console:
{code}16:14:43,064 INFO [STDOUT] Captured message from the Println action:
16:14:43,065 INFO [STDOUT] [Hello world
].
16:14:43,065 INFO [STDOUT] Processing message: Hello world
16:14:43,082 INFO [FailoverTransport] Successfully connected to tcp://localhost:61616
16:14:43,114 INFO [STDOUT] Acknowledge receipt of ActiveMQ consumed message.:
16:14:43,115 INFO [STDOUT] [JMS Messsage: Hello world{code}
Note the final two lines which acknowledge receipt of the ActiveMQ message and the JMS message output.
Congratulations! You have successfully deployed an ActiveMQ ESB gateway to allow SOA-P to consume JMS messages via ActiveMQ.
Artifacts
1) This prototype contains one Java file, an ESB action named ActiveMQMessageAction.java:
package com.jboss.demo;
import org.jboss.soa.esb.actions.AbstractActionLifecycle;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.message.Message;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* Action to demonstrate the delivery of an ActiveMQ JMS message from a string.
*/public class ActiveMQMessageAction extends AbstractActionLifecycle {
public ActiveMQMessageAction(ConfigTree config) {}
public Message process(Message message) {
String messageStr = new String((byte[]) message.getBody().get());
System.out.println("Processing message: " + messageStr);
try {
processJMSMessage(messageStr);
} catch (Exception e) {
e.printStackTrace();
}
return message;
}
/**
* Process the message.
* @param messageStr The message string.
* @throws Exception
*/
private void processJMSMessage(String messageStr) throws JMSException, NamingException {
Context context = getContext();
QueueConnectionFactory factory =
(QueueConnectionFactory)context.lookup("ConnectionFactory");QueueConnection conn = factory.createQueueConnection();
conn.start();
Queue queue = (Queue)context.lookup("MYQUEUE");
QueueSession session =
conn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
QueueSender sender = session.createSender(queue);
TextMessage message = session.createTextMessage("JMS Messsage: " + messageStr);
sender.send(message);
sender.close();
conn.close();
}
/**
* Returns the ActiveMQ context.
* @return
* @throws NamingException
*/
private Context getContext() throws NamingException {
Context context = null;
try {
java.util.Properties properties = new java.util.Properties();
properties.setProperty("java.naming.factory.initial",
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
properties.setProperty("queue.MYQUEUE", "MYQUEUE");
context = new InitialContext(properties);
} catch (NamingException e) {
e.printStackTrace();
}
return context;
}
}
2) Below is the jboss-esb.xml file:
{code:xml}<?xml version="1.0"?>
<jbossesb parameterReloadSecs="5"
xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd">
<providers>
<fs-provider name="FilePollerProvider">
<fs-bus busid="FilePollerChannel">
<fs-message-filter directory="/tmp" input-suffix="jmsdata"/>
</fs-bus>
</fs-provider>
<jms-provider connection-factory="ConnectionFactory" name="ActiveMQProvider">
<property name="jndi-context-factory" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
<property name="queue.MYQUEUE" value="MYQUEUE"/>
<jms-bus busid="ActiveMQChannel">
<jms-message-filter dest-name="MYQUEUE" dest-type="QUEUE"/>
</jms-bus>
</jms-provider>
</providers>
<services>
<service category="File poller service"
description="Capture data from a .jmsdata file" invmScope="GLOBAL" name="FilePollerService">
<listeners>
<fs-listener busidref="FilePollerChannel" is-gateway="true" name="FileServiceListener"/>
</listeners>
<actions>
<action name="PrintlnAction">
<property name="message" value="Captured message from the Println action"/>
</action>
<action name="ActiveMQMessageAction"/>
</actions>
</service>
<service category="Service ActiveMQ Messages"
description="Service ActiveMQ Messages" invmScope="GLOBAL" name="ActiveMQService">
<listeners>
<jms-listener busidref="ActiveMQChannel" is-gateway="true" name="ActiveMQListener">
<jms-message-filter dest-name="MYQUEUE" dest-type="QUEUE"/>
</jms-listener>
</listeners>
<actions>
<action name="ActiveMQAck">
<property name="message" value="Acknowledge receipt of ActiveMQ consumed message."/>
</action>
</actions>
</service>
</services>
</jbossesb>{code}
Comments