/* * JBoss, Home of Professional Open Source * Copyright 2006, JBoss Inc., and individual contributors as indicated * by the @authors tag. See the copyright.txt in the distribution for a * full listing of individual contributors. * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this software; if not, write to the Free * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ /** * Routes the Message argument to a fixed list of services ([category,name]) * @author schifest@heuristica.com.ar * @since Version 4.0 */ package org.jboss.soa.esb.actions; import java.net.URISyntaxException; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.log4j.Logger; import org.jboss.soa.esb.ConfigurationException; import org.jboss.soa.esb.addressing.Call; import org.jboss.soa.esb.addressing.EPR; import org.jboss.soa.esb.addressing.MalformedEPRException; import org.jboss.soa.esb.addressing.eprs.LogicalEPR; import org.jboss.soa.esb.client.ServiceInvoker; import org.jboss.soa.esb.couriers.CourierException; import org.jboss.soa.esb.couriers.CourierFactory; import org.jboss.soa.esb.couriers.TwoWayCourier; import org.jboss.soa.esb.helpers.ConfigTree; import org.jboss.soa.esb.listeners.message.MessageDeliverException; import org.jboss.soa.esb.message.Message; import org.jboss.soa.esb.message.format.MessageFactory; import org.jboss.soa.esb.services.registry.RegistryException; /** * Simple transport agnostic event handler. Endpoints can subscribe to be * informed on the arrival of a specific message/event. The messages are not * stored durably. If the message is suitably instrumented (with the FORWARD * property) then it will be sent to all registered event handlers. Otherwise, a * new message will be sent that contains just the event name and a copy of the * original message header. * * TODO maybe make it persistent (or at least certain topics). */ public class EventManager extends AbstractActionPipelineProcessor { public static final String SUBSCRIBE_MESSAGE = "SUBSCRIBE"; public static final String UNSUBSCRIBE_MESSAGE = "UNSUBSCRIBE"; public static final String EVENT_MESSAGE = "EVENT"; public static final String WILD_CARD = "*"; public static final String OPCODE = "org.jboss.soa.esb.actions.event.opcode"; public static final String EVENT_TYPE = "org.jboss.soa.esb.actions.event.type"; public static final String MESSAGE_IS_EVENT = "org.jboss.soa.esb.actions.event.message"; public EventManager () { _events = new ConcurrentHashMap(); } /** * No real configuration options at this stage. Maybe consider sending * triggers periodically rather than as soon as they occur? Maybe add a * handler that gets called whenever a failure occurs on deliver? * * @param config * @throws ConfigurationException * @throws RegistryException */ public EventManager(ConfigTree config) throws ConfigurationException, RegistryException { _events = new ConcurrentHashMap(); } /** * Initialise the action instance.

This method is called after the * action instance has been instantiated so that configuration options * can be validated. * * @throws ActionLifecycleException * for errors during initialisation. */ public void initialise () throws ActionLifecycleException { } /** * Destroy the action instance.

This method is called prior to the * release of the action instance. All resources associated with this * action instance should be released as the instance will no longer be * used. */ public void destroy () throws ActionLifecycleException { } /** * Processes an incoming message. If the message is a control type * (subscribe or unsubscribe) then deal with it as such. Responses to * these control messages will only be sent if there is a ReplyTo field * set on the incoming message. Otherwise it is assumed that the sender * does not want to be informed. * * If the message is anything other than that then we check to see if * it's an event message. If it is, the action iterates over all of the * registered endpoints for that event and sends the message on. * * @param message * @return the original message * @throws ActionProcessingException */ @SuppressWarnings("unchecked") public Message process (Message message) throws ActionProcessingException { final String opcode = (String) message.getBody().get(OPCODE); /* * Ignore the message if it's not of the right format. In that case, * maybe it's for another action in the chain. */ /* * Ah, hand-crafted stub generation :-) */ if (opcode != null) { if (opcode.equals(SUBSCRIBE_MESSAGE)) subscribe(message); else { if (opcode.equals(UNSUBSCRIBE_MESSAGE)) unsubscribe(message); else { if (opcode.equals(EVENT_MESSAGE)) trigger(message); } } } return message; } private void subscribe (final Message message) throws ActionProcessingFaultException { /* * Pull out the EPR for event triggering. */ EPR toEpr = message.getHeader().getCall().getFrom(); if (toEpr == null) toEpr = message.getHeader().getCall().getReplyTo(); if (toEpr == null) { logger.error("Event.subscribe - no From or ReplyTo EPR specified!"); throw new ActionProcessingFaultException( "No From or ReplyTo EPR specified for subscribe!"); } /* * Pull out the list of events on which to be triggered. Each event is * uniquely identified and placed in the body with an attribute name * starting with EVENT_TYPE. */ String[] names = message.getBody().getNames(); for (int i = 0; i < names.length; i++) { /* * Duplicates are allowed. */ if (names[i].startsWith(EVENT_TYPE)) { EventMap theMap = null; synchronized (_events) { theMap = _events.get((String) message.getBody().get(names[i])); if (theMap == null) { theMap = new EventMap((String) message.getBody().get(names[i])); _events.put((String) message.getBody().get(names[i]), theMap); } } theMap.addEPR(toEpr); } } } private void unsubscribe (final Message message) throws ActionProcessingFaultException { /* * Pull out the EPR for event triggering. */ EPR toEpr = message.getHeader().getCall().getFrom(); if (toEpr == null) toEpr = message.getHeader().getCall().getReplyTo(); if (toEpr == null) { logger .error("Event.unsubscribe - no From or ReplyTo EPR specified!"); throw new ActionProcessingFaultException( "No From or ReplyTo EPR specified for unsubscribe!"); } /* * Pull out the list of events on which to be triggered. Each event is * uniquely identified and placed in the body with an attribute name * starting with EVENT_TYPE. */ String[] names = message.getBody().getNames(); for (int i = 0; i < names.length; i++) { /* * Duplicates are allowed. */ if (names[i].startsWith(EVENT_TYPE)) { /* * Since messages may be lost, assume unsubscribe from an empty * event list is valid and refers to something that no longer * exists. */ synchronized (_events) { EventMap theMap = _events.get(names[i]); if (theMap != null) { theMap.removeEPR(toEpr); if (theMap.size() == 0) _events.remove(names[i]); } } } } } private void trigger (final Message message) throws ActionProcessingFaultException { /* * Get event name. */ final String eventName = (String) message.getBody().get(EVENT_TYPE); if ((eventName == null) || (eventName.equals(""))) throw new ActionProcessingFaultException("No event name specified!"); final EventMap handler = _events.get(eventName); final EventMap all = _events.get(WILD_CARD); if ((handler != null) || (all != null)) { /* * Now see if we want to forward the message as-is, or create a * new one. */ final String content = (String) message.getBody().get(MESSAGE_IS_EVENT); Message event; if ("true".equalsIgnoreCase(content)) event = message; else event = createMessage(message); if (handler != null) handler.trigger(event); if (all != null) all.trigger(event); } } private Message createMessage (final Message message) throws ActionProcessingFaultException { Message msg = MessageFactory.getInstance() .getMessage(message.getType()); // make sure it's the same type! msg.getBody().add(EVENT_TYPE, message.getBody().get(EVENT_TYPE)); Call call; try { call = new Call(message.getHeader().getCall()); } catch (URISyntaxException ex) { throw new ActionProcessingFaultException( "Could not create header routing information for event message!"); } msg.getHeader().setCall(call); return msg; } private ConcurrentHashMap _events; private final static Logger logger = Logger.getLogger(EventManager.class); class EventMap { public EventMap(String name) { _name = name; _endpoints = new ConcurrentLinkedQueue(); } /** * Send the trigger message to the list of registered recipients. This * is NOT an atomic operation: any failures will be logged, but the * trigger will continue. * * @param msg * The message to send. */ public void trigger (final Message msg) { Iterator iter = _endpoints.iterator(); while (iter.hasNext()) { TwoWayCourier courier = null; ServiceInvoker service = null; EPR to = iter.next(); try { /* * Use ServiceInvoker if the EPR is a LogicalEPR. */ if (to instanceof LogicalEPR) service = ((LogicalEPR) to).getServiceInvoker(); else courier = CourierFactory.getInstance().getMessageCourier(to); } catch (MessageDeliverException ex) { logger.warn("Problem with LogicalEPR", ex); } catch (CourierException ex) { logger.warn("Courier exception", ex); } catch (MalformedEPRException ex) { logger.warn("getCourier problem", ex); } if ((courier == null) && (service == null)) { logger.warn("Could not get Courier or ServiceInvoker for " + to); } else { msg.getHeader().getCall().setTo(to); try { if (courier != null) { courier.deliver(msg); CourierFactory.deregisterCourier(courier); } else service.deliverAsync(msg); } catch (MessageDeliverException ex) { logger.warn("Problem delivering message through ServiceInvoker!", ex); } catch (CourierException ex) { logger.warn("Exception during deliver!", ex); } catch (MalformedEPRException ex) { logger.warn("Addressing problem during deliver!", ex); } } } } public int size () { return _endpoints.size(); } public String name () { return _name; } public void addEPR (EPR endpoint) { _endpoints.add(endpoint); } public void removeEPR (EPR endpoint) { _endpoints.remove(endpoint); } public String toString () { return "Trigger name: " + _name + ", recipient: " + _endpoints; } private String _name; private ConcurrentLinkedQueue _endpoints; } }