Message Inflow Example
In this example we will use JCA 1.5 message inflow to monitor a directory for xml files and then forward them to an mdb. If the transaction commits, we delete the xml file to show the use of an XAResource.
WARNING: This is not production quality code.
Overview
We need two deployments for this example.
The inbound resource adapter that has the job of monitoring the directory and forwarding any "messages" to the application
The mdb deployment configured to receive those "messages"
The Resource Adapter
The resource adapter is made up of the following pieces:
The resource adapter bean for lifecycle and endpoint activation
The activation spec to define parameters configurable on the mdb deployment
The message listener class the mdb will implement.
The resource adapter bean
Its major responsibility in this application is keeping track of endpoints,
responding to events from the application about its own and mdb deployments.
/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.example; import java.util.Iterator; import java.util.Map; import javax.resource.ResourceException; import javax.resource.spi.ActivationSpec; import javax.resource.spi.BootstrapContext; import javax.resource.spi.ResourceAdapter; import javax.resource.spi.ResourceAdapterInternalException; import javax.resource.spi.endpoint.MessageEndpoint; import javax.resource.spi.endpoint.MessageEndpointFactory; import javax.transaction.xa.XAResource; import org.jboss.logging.Logger; import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap; /** * A FileXMLResourceAdapter. * * @author <a href="adrian@jboss.com">Adrian Brock</a> * @version $Revision: 1.3 $ */ public class FileXMLResourceAdapter implements ResourceAdapter { /** The logger */ private static final Logger log = Logger.getLogger(FileXMLResourceAdapter.class); /** The bootstrap context */ private BootstrapContext ctx; /** The activations by activation spec */ private ConcurrentReaderHashMap activations = new ConcurrentReaderHashMap(); public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec spec) throws ResourceException { FileXMLActivation activation = new FileXMLActivation(ctx.createTimer(), (FileXMLActivationSpec) spec); MessageEndpoint endpoint = endpointFactory.createEndpoint(activation); activation.setEndpoint(endpoint); activations.put(spec, activation); try { activation.start(); } catch (ResourceException e) { endpoint.release(); throw e; } } public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec spec) { FileXMLActivation activation = (FileXMLActivation) activations.remove(spec); if (activation != null) activation.stop(); } public XAResource[] getXAResources(ActivationSpec[] specs) throws ResourceException { // TODO getXAResources return null; } public void start(BootstrapContext ctx) throws ResourceAdapterInternalException { this.ctx = ctx; } public void stop() { for (Iterator i = activations.values().iterator(); i.hasNext();) { try { FileXMLActivation activation = (FileXMLActivation) i.next(); activation.stop(); } catch (Exception ignored) { log.debug("Ignored", ignored); } i.remove(); } } }
The message listener interface
The MDB will implement this class do define what processing should be
done for each message.
/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.example; import org.w3c.dom.Document; /** * An XMLMessageListener. * * @author <a href="adrian@jboss.com">Adrian Brock</a> * @version $Revision: 1.1 $ */ public interface XMLMessageListener { void processXML(Document document) throws Exception; }
The activation specification
This javabean is configured at deployment time for the mdb to specify parameters. In this case we have two parameters:
directory - where the xml files will be placed
period - the length of time between directory scans.
/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.example; import javax.resource.ResourceException; import javax.resource.spi.ActivationSpec; import javax.resource.spi.InvalidPropertyException; import javax.resource.spi.ResourceAdapter; /** * A FileXMLActivationSpec. * * @author <a href="adrian@jboss.com">Adrian Brock</a> * @version $Revision: 1.1 $ */ public class FileXMLActivationSpec implements ActivationSpec { private ResourceAdapter ra; private String directory; private long period = 10000; public void validate() throws InvalidPropertyException { // TODO validate } public String getDirectory() { return directory; } public void setDirectory(String directory) { this.directory = directory; } public long getPeriodValue() { return period; } public String getPeriod() { return Long.toString(period); } public void setPeriod(String period) { this.period = Long.parseLong(period); } public ResourceAdapter getResourceAdapter() { return ra; } public void setResourceAdapter(ResourceAdapter ra) throws ResourceException { this.ra = ra; } public String toString() { return "FileXMLActivationSpec for directory " + directory; } }
A helper class for implementation
This class implements the endpoint activation. It is also a bogus (the example is really a bit contrived) use of an XAResource to take part in the transaction. In this case we remove the file at transaction commit.
/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.example; import java.lang.reflect.Method; import java.io.File; import java.util.Timer; import java.util.TimerTask; import javax.resource.ResourceException; import javax.resource.spi.endpoint.MessageEndpoint; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import org.jboss.logging.Logger; import org.w3c.dom.Document; import org.xml.sax.InputSource; /** * A FileXMLActivation. * * @author <a href="adrian@jboss.com">Adrian Brock</a> * @version $Revision: 1.1 $ */ public class FileXMLActivation extends TimerTask implements XAResource { /** The log */ private static final Logger log = Logger.getLogger(FileXMLActivation.class); /** The timer */ private Timer timer; /** The activation spec */ private FileXMLActivationSpec spec; /** The message endpoint */ private MessageEndpoint endpoint; /** The directory */ private File directory; /** The current file */ private File currentFile; /** The document builder */ DocumentBuilder builder; /** The process xml method */ private static final Method PROCESSXML; static { try { PROCESSXML = XMLMessageListener.class.getMethod("processXML", new Class[] { Document.class }); } catch (Exception e) { throw new RuntimeException(e); } } public FileXMLActivation(Timer timer, FileXMLActivationSpec spec) { this.timer = timer; this.spec = spec; } public void setEndpoint(MessageEndpoint endpoint) { this.endpoint = endpoint; } public void start() throws ResourceException { directory = new File(spec.getDirectory()); if (!directory.exists()) throw new ResourceException(directory + " does not exist"); if (!directory.isDirectory()) throw new ResourceException(directory + " is not a directory"); DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); try { builder = dbf.newDocumentBuilder(); } catch (Exception e) { throw new ResourceException(e.toString()); } timer.schedule(this, 0l, spec.getPeriodValue()); } public void stop() { cancel(); timer.cancel(); endpoint.release(); } public void run() { File[] files = directory.listFiles(); for (int i = 0; i < files.length; ++i) { if (files[i].isFile()) { currentFile = files[i]; try { Document doc = parseFile(files[i]); if (doc != null) processXML(doc); } finally { currentFile = null; } } } } protected Document parseFile(File file) { try { InputSource is = new InputSource(file.toURL().toString()); return builder.parse(is); } catch (Throwable t) { log.error("Error parsing file " + file, t); return null; } } protected void processXML(Document doc) { try { endpoint.beforeDelivery(PROCESSXML); // At this point we are in the transaction and we have the mdb's classloader try { ((XMLMessageListener) endpoint).processXML(doc); } finally { // This must be invoked if beforeDelivery was invoked endpoint.afterDelivery(); } } catch (Throwable t) { log.error("Error in message listener", t); } } // XAResource implementation (a bad implementation) public void start(Xid xid, int flags) { } public void end(Xid xid, int flags) { } public int prepare(Xid xid) { return XAResource.XA_OK; } public void rollback(Xid xid) { } public void commit(Xid xid, boolean onePhase) throws XAException { currentFile.delete(); } public void forget(Xid xid) { } public Xid[] recover(int flag) { return new Xid[0]; } public int getTransactionTimeout() { return 0; } public boolean setTransactionTimeout(int seconds) { return false; } public boolean isSameRM(XAResource xares) { return (xares == this); } }
The deployment descriptor for the rar
<?xml version="1.0" encoding="UTF-8"?> <connector xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/connector_1_5.xsd" version="1.5"> <description>File xml resource adapter</description> <display-name>Inflow of file xml Resource Adapter</display-name> <vendor-name>JBoss, Inc</vendor-name> <eis-type>JBoss Example</eis-type> <resourceadapter-version>4.0</resourceadapter-version> <license> <description> COPYRIGHT AND PERMISSION NOTICE Copyright (c) 2004 JBoss, Inc This is released under the terms of the LGPL. See gnu.org for details. </description> <license-required>true</license-required> </license> <resourceadapter> <resourceadapter-class>org.jboss.example.FileXMLResourceAdapter</resourceadapter-class> <inbound-resourceadapter> <messageadapter> <messagelistener> <messagelistener-type>org.jboss.example.XMLMessageListener</messagelistener-type> <activationspec> <activationspec-class>org.jboss.example.FileXMLActivationSpec</activationspec-class> <required-config-property> <config-property-name>directory</config-property-name> </required-config-property> </activationspec> </messagelistener> </messageadapter> </inbound-resourceadapter> </resourceadapter> </connector>
The message driven bean
This is a separate ejb deployment. You can have as many of these as you like for a given resource adapter. You just specify different activation spec
properties.
This is a simple mdb that just dumps the xml to the console.
Notice it implements our message listener not the jms message listener.
/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.example; import java.io.ByteArrayOutputStream; import javax.ejb.EJBException; import javax.ejb.MessageDrivenBean; import javax.ejb.MessageDrivenContext; import javax.xml.transform.Transformer; import javax.xml.transform.TransformerFactory; import javax.xml.transform.dom.DOMSource; import javax.xml.transform.stream.StreamResult; import org.jboss.example.XMLMessageListener; import org.jboss.logging.Logger; import org.w3c.dom.Document; /** * Prints the xml. * * @author <a href="adrian@jboss.com">Adrian Brock</a> * @version $Revision: 1.1 $ */ public class EchoXMLMessageListener implements MessageDrivenBean, XMLMessageListener { private static final Logger log = Logger.getLogger(EchoXMLMessageListener.class); private MessageDrivenContext ctx; private Transformer transformer; public void processXML(Document document) throws Exception { DOMSource source = new DOMSource(document); ByteArrayOutputStream baos = new ByteArrayOutputStream(); StreamResult result = new StreamResult(baos); transformer.transform(source, result); log.info(baos.toString()); } public void ejbCreate() { TransformerFactory tf = TransformerFactory.newInstance(); try { transformer = tf.newTransformer(); } catch (Exception e) { throw new EJBException(e); } } public void ejbRemove() { } public void setMessageDrivenContext(MessageDrivenContext ctx) { this.ctx = ctx; } }
The mdb deployment descriptors
ejb-jar.xml - besides the normal deployment information you specify <activation-config-property> values to configure the activation spec.
<?xml version="1.0" encoding="UTF-8"?> <ejb-jar xmlns="http://java.sun.com/xml/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/ejb-jar_2_1.xsd" version="2.1"> <enterprise-beans> <message-driven> <ejb-name>EchoXMLMDB</ejb-name> <ejb-class>org.jboss.example.EchoXMLMessageListener</ejb-class> <messaging-type>org.jboss.example.XMLMessageListener</messaging-type> <activation-config> <activation-config-property> <activation-config-property-name>directory</activation-config-property-name> <activation-config-property-value>c:\testfilexml</activation-config-property-value> </activation-config-property> <activation-config-property> <activation-config-property-name>period</activation-config-property-name> <activation-config-property-value>5000</activation-config-property-value> </activation-config-property> </activation-config> <transaction-type>Container</transaction-type> </message-driven> </enterprise-beans> <assembly-descriptor> <container-transaction> <method> <ejb-name>EchoXMLMDB</ejb-name> <method-name>*</method-name> </method> <trans-attribute>Required</trans-attribute> </container-transaction> </assembly-descriptor> </ejb-jar>
jboss.xml - the only thing required in here is the identity of the resource adapter
<?xml version="1.0" encoding="UTF-8"?> <jboss> <enterprise-beans> <message-driven> <ejb-name>EchoXMLMDB</ejb-name> <destination-jndi-name>dummy</destination-jndi-name> <resource-adapter-name>filexmlrar.rar</resource-adapter-name> </message-driven> </enterprise-beans> </jboss>
Building the example
Two ant projects are attached
filexmlrar - the resource adapter
filexmlmdb - the test mdb
You need to change the location of your jboss deployment in build.properties
You also need to change the location of the directory that is being
scanned in jboss.xml (the example uses c:\testfilexml)
Obviously the rar must be deployed before the mdb
Output
Deploy the rar
14:02:41,403 INFO [RARDeployment] Required license terms exist view the META-INF/ra.xml: file:/C:/jboss-head/workspace/ build/output/jboss-4.0.0RC2/server/default/deploy/filexmlrar.rar
Deploy the mdb
14:03:02,414 INFO [EjbModule] Deploying EchoXMLMDB 14:03:03,655 INFO [EJBDeployer] Deployed: file:/C:/jboss-head/workspace/build/output/jboss-4.0.0RC2/server/default/depl oy/filexmlmdb.jar
Finally copy an xml file into the scanned directory (c:\testfilexml)
14:03:13,730 INFO [EchoXMLMessageListener] <?xml version="1.0" encoding="UTF-8"?><ejb-jar xmlns="http://java.sun.com/xm l/ns/j2ee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="2.1" xsi:schemaLocation="http://java.sun.com/x ml/ns/j2ee http://java.sun.com/xml/ns/j2ee/ejb-jar_2_1.xsd"> <enterprise-beans> <message-driven> <ejb-name>EchoXMLMDB</ejb-name> <ejb-class>org.jboss.example.EchoXMLMessageListener</ejb-class> <messaging-type>org.jboss.example.XMLMessageListener</messaging-type> <activation-config> <activation-config-property> <activation-config-property-name>directory</activation-config-property-name> <activation-config-property-value>c:\testfilexml</activation-config-property-value> </activation-config-property> <activation-config-property> <activation-config-property-name>period</activation-config-property-name> <activation-config-property-value>5000</activation-config-property-value> </activation-config-property> </activation-config> <transaction-type>Container</transaction-type> </message-driven> </enterprise-beans> <assembly-descriptor> <container-transaction> <method> <ejb-name>EchoXMLMDB</ejb-name> <method-name>*</method-name> </method> <trans-attribute>Required</trans-attribute> </container-transaction> </assembly-descriptor> </ejb-jar>
Comments