Writing Custom Listeners for JBoss ESB 4.x

Version 21

    Overview

    This document outlines 3 methods of constructing custom listeners for JBoss ESB v4.x:

     

    1. AbstractThreadedManagedLifecycle/AbstractManagedLifecycle Listener: A Listener that uses the base Listener API to create a Threaded/Non-Threaded listener, executing based on the managed lifecycle.
    2. Schedule Driven Listener:  A listener uses the ScheduledEventListener to "wake up" based on a configured schedule and generates a message for the Service Action Pipeline.
    3. Groovy Scripted Event Driven Listener: A listener that "wakes up" based on an event triggered by an external process (e.g. a message received on a JMS Queue) and generates a message for the Service Action Pipeline.

    AbstractThreadedManagedLifecycle/AbstractManagedLifecycle Listener

    All Listeners in JBoss ESB are implemented using either the AbstractThreadedManagedLifecycle or AbstractManagedLifecycle classes.  Extending one of these classes is trivial enough:

    public class MyCustomGateway extends AbstractThreadedManagedLifecycle {
    
        private ConfigTree listenerConfig;
        private Service service;
        private ServiceInvoker serviceInvoker;
    
        public MyCustomGateway(final ConfigTree config) throws ConfigurationException {
            super(config);
            this.listenerConfig = config;
    
            String serviceCategory = listenerConfig.getRequiredAttribute(ListenerTagNames.TARGET_SERVICE_CATEGORY_TAG);
            String serviceName = listenerConfig.getRequiredAttribute(ListenerTagNames.TARGET_SERVICE_NAME_TAG);
    
            service = new Service(serviceCategory, serviceName);
        }
    
        protected void doInitialise() throws ManagedLifecycleException {
            // Create the ServiceInvoker instance for the target service....
            try {
                serviceInvoker = new ServiceInvoker(service);
            } catch (MessageDeliverException e) {
                throw new ManagedLifecycleException("Failed to create ServiceInvoker for Service '" + service + "'.");
            }
        }
    
        protected void doRun() {
            while(isRunning()) {
                // Wait for a message....
                Object payloadObject = waitForPayload();
    
                // Send the message to the target service's Action Pipeline via
                // the ServiceInvoker...
                try {
                    Message esbMessage = MessageFactory.getInstance().getMessage();
    
                    esbMessage.getBody().add(payloadObject);
                    serviceInvoker.deliverAsync(esbMessage);
                } catch (MessageDeliverException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private Object waitForPayload() {
            // Wait for a message...
        }
    }
    

    This Gateway extends the AbstractThreadedManagedLifecycle class and implements the doRun method (the Thread method). Note how the doRun method checks the running state of the listener on each iteration of its run loop.

     

    Your listener should extend AbstractManagedLifecycle if you don't require a threaded listener.

     

    To configure a custom gateway such as this, you need to use the base configuration types <bus-provider> <bus> and <listener> as follows:

    <?xml version = "1.0" encoding = "UTF-8"?>
    <jbossesb xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd"
              parameterReloadSecs="5">
    
        <providers>
            <bus-provider name="CustomProvider">
                <property name="provider-property" value="buprovider-prop-value" />
    
                <bus busid="custom-bus">
                    <property name="bus-property" value="bus-prop-value" />
                </bus>
            </bus-provider>
        </providers>
    
        <services>
            <service category="Custom" name="Listener" description="" invmScope="GLOBAL">
                <listeners>
                    <listener name="custom-listener" busidref="custom-bus" is-gateway="true">
                        <property name="gatewayClass" value="com.acme.listeners.MyCustomGateway" />
                        <property name="listener-property" value="listener-prop-value" />
                    </listener>
                </listeners>
                <actions mep="OneWay">
                    ...
                </actions>
            </service>
        </services>
    
    </jbossesb>
    

    Schedule Driven Listener

    <scheduled-listener> can be used to perform scheduled tasks based on <simple-schedule> or <cron-schedule> provider configurations.


    <scheduled-listener> is configured with an “event-processor” class, which can be an implementation of one of the following interfaces:

    • ScheduledEventListener: Event Processors that implement this interface are simply triggered through the “onSchedule” method.  No action processing pipeline is executed.
    • ScheduledEventMessageComposer:  Event Processors that implement this interface are capable of “composing” a message for the action processing pipeline associated with the listener.

     

    The attributes of this listener are:

    1. name”:  The name of the listener instance.
    2. event-processor”:  The event processor class that's called on every schedule trigger.  Se above for implementation details.
    3. One of:
      • scheduleidref”:  I the scheduleid of the schedule to use for triggering this listener (configured in the providers).
      • schedule-frequency”:  Schedule frequency (in seconds).  A convenient way of specifying a simple schedule directly on the listener.

    Example

    In this example, lets suppose you need to process "order" files and the File Listener component that ships with JBoss ESB doesn't quite meet your requirements.  In this situation, you can always write your own custom file listener by implementing the ScheduledEventMessageComposer interface:

    public class OrderFileListener implements ScheduledEventMessageComposer {
    
        public void initialize(ConfigTree config) throws ConfigurationException {
            // TODO: Initialise File filters etc, using the config to access properties configured on the listener...
        }
    
        public Message composeMessage() throws SchedulingException {
            Message message = MessageFactory.getInstance().getMessage();
    
            // TODO: Read one or more order files and populate the data into the ESB message for pipeline processing...
    
            return message;
        }
    
        public Message onProcessingComplete(Message message) throws SchedulingException {
            // TODO: Post pipeline processing...
            return message;
        }
    
        public void uninitialize() {
            // TODO: Any relevant cleanup tasks...
        }
    }
    

    And then, to configure this into an ESB Service:

    <?xml version = "1.0" encoding = "UTF-8"?>
    <jbossesb xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd">
    
        <providers>
            <schedule-provider name="schedules">
                <simple-schedule scheduleid="ordersPole" frequency="5" frequencyUnits="seconds" />
            </schedule-provider>
        </providers>
    
        <services>
            <service category="OrderManagement" name="OrderProcessing" description="Order Processing Service">
    
                <listeners>
                    <scheduled-listener name="orderFileListener" scheduleidref="ordersPole" event-processor="com.acme.OrderFileListener"/>
                </listeners>
    
                <actions>
                    <action name="action1" class="..."/>
                    <action name="action2" class="..."/>
                    <action name="action3" class="..."/>
                </actions>
            </service>
        </services>
    
    </jbossesb>
    

    Groovy Scripted Event Driven Listener

    One of the easier ways of implementing an Event Driven Listener in JBoss ESB is by hooking in the listener via a Groovy script and the <groovy-listener> configuration.

     

    The <groovy-listener> is very easy to configure.  It takes the following configuration attributes:

    1. name”:  The name of the listener instance.
    2. script”:  The path (on the classpath) to the Groovy Script.

     

    The Groovy script becomes the effective "gateway".

     

    The script has access to the following script variable bindings:

    1. "config": The listener configuration (type ConfigTree) i.e. the nested <property> element values.  This is also required for construction of the Action Pipeline.
    2. "gateway": A reference to the underlying GroovyGateway listener (Java).  This provides access to the lsitener lifecycle.  More no lifecycle later.

    Example

    Suppose the JMS Listener implemention provided with JBoss ESB does not meet your requirements (ala the File Listener in the Scheduling example above).  You could hook in your own custom JMS Listener via a Groovy script and the <groovy-listener>.

     

    Taking the twist on the "Order" example used in the Schedule Listener example (above), we could start by implementing a javax.jms.MessagListener:

    public class OrderListener implements MessageListener {
        
        public void onMessage(final Message message) {
            if(message instanceof ObjectMessage) {
                Order order = ((ObjectMessage) message).getObject();
     
                // Create and populate an ESB message with the order....
                Message esbMessage = MessageFactory.getInstance().getMessage();
                esbMessage.getBody().add(order);
     
                // TODO: Add code to forward the ESB message to the Action Pipeline...
            }
        }
    }
    

    Now we need to add the hooks for making this work in the ESB.  We need to create "start" and "stop" methods for managing the listener's lifecycle, creating the Action Pipeline instance (and other resources):

    public class OrderListener implements MessageListener {
    
        private ActionProcessingPipeline pipeline;
    
        public void start(ConfigTree config) throws ConfigurationException {
            // Create and initialize the pipeline..
            pipeline = new ActionProcessingPipeline(config);
            pipeline.initialise();
    
            // TODO: Add JMS code for connecting this JMS MessageListener to the JMS Queue... 
        }
    
        public void onMessage(final Message message) {
            if(message instanceof ObjectMessage) {
                Order order = ((ObjectMessage) message).getObject();
    
                // Create and populate an ESB message with the order....
                Message esbMessage = MessageFactory.getInstance().getMessage();
                esbMessage.getBody().add(order);
               
                // Forward the ESB message to the Action Pipeline...
                boolean success = pipeline.process(message);            
                if(!success) {
                    // TODO: Handle error....
                }
            }
        }
    
        public void stop() {
            try {
                // TODO: Add JMS code for disconnecting from JMS Queue....
            } finally {
                if(pipeline != null) {
                    pipeline.destroy() ;
                }            
            }
        }
    }
    

    Note: You could also used the ServiceInvoker in place of executing the Action Pipeline directly via the ActionProcessingPipeline class, just as with the AbstractThreadedManagedLifecycle example earlier in this document.  This would require a message aware listener to be installed on the service (an InVM listener would be fine) and would also mean that the pipeline is executed asynchronously from the implemented listeners thread of execution.

     

    Now we need to implement the Groovy Script that will hook in the OrderListener into the ESB, as well as manage it's lifecycle (starting and stopping):

    import com.acme.OrderListener;
    
    OrderListener orderListener = new OrderListener();
    
    // Start the listener (passing the config)...
    orderListener.start(config);
    
    // Wait until the Groovy Gateway is signaled to stop...
    def stopped = false;
    while(!stopped) {
        stopped = gateway.waitUntilStopping(200);
    }
    
    // Now stop the listener...
    orderListener.stop();
    

    And finally, to configure the Groovy script into your ESB Service:

    <?xml version = "1.0" encoding = "UTF-8"?>
    <jbossesb xmlns="http://anonsvn.labs.jboss.com/labs/jbossesb/trunk/product/etc/schemas/xml/jbossesb-1.0.1.xsd">
    
        <services>
            <service category="OrderManagement" name="OrderProcessing" description="Order Processing Service">
    
                <listeners>
                    <groovy-listener name="orderJmsListener" script="/com/acme/OrderListener.groovy">
                        <property name="queueName" value="..."/>
                    </groovy-listener>
                </listeners>
    
                <actions>
                    <action name="action1" class="..."/>
                    <action name="action2" class="..."/>
                    <action name="action3" class="..."/>
                </actions>
            </service>
        </services>
    
    </jbossesb>
    

    ESB Component Lifecycle Management

    All ESB compoenents are managed via a well defined lifecycle.

     

    Managed Lifecycle

    managedlifecycle.png

    TODO: Explain...

    Managed Lifecycle Thread

    managedlifecyclethread.png

    TODO: Explain...