/*
* JBoss, Home of Professional Open Source
* Copyright 2006, JBoss Inc., and individual contributors as indicated
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
/**
* Routes the Message argument to a fixed list of services ([category,name])
* @author schifest@heuristica.com.ar
* @since Version 4.0
*/
package org.jboss.soa.esb.actions;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.log4j.Logger;
import org.jboss.soa.esb.ConfigurationException;
import org.jboss.soa.esb.addressing.Call;
import org.jboss.soa.esb.addressing.EPR;
import org.jboss.soa.esb.addressing.MalformedEPRException;
import org.jboss.soa.esb.addressing.eprs.LogicalEPR;
import org.jboss.soa.esb.client.ServiceInvoker;
import org.jboss.soa.esb.couriers.CourierException;
import org.jboss.soa.esb.couriers.CourierFactory;
import org.jboss.soa.esb.couriers.TwoWayCourier;
import org.jboss.soa.esb.helpers.ConfigTree;
import org.jboss.soa.esb.listeners.message.MessageDeliverException;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.format.MessageFactory;
import org.jboss.soa.esb.services.registry.RegistryException;
/**
* Simple transport agnostic event handler. Endpoints can subscribe to be
* informed on the arrival of a specific message/event. The messages are not
* stored durably. If the message is suitably instrumented (with the FORWARD
* property) then it will be sent to all registered event handlers. Otherwise, a
* new message will be sent that contains just the event name and a copy of the
* original message header.
*
* TODO maybe make it persistent (or at least certain topics).
*/
public class EventManager extends AbstractActionPipelineProcessor
{
public static final String SUBSCRIBE_MESSAGE = "SUBSCRIBE";
public static final String UNSUBSCRIBE_MESSAGE = "UNSUBSCRIBE";
public static final String EVENT_MESSAGE = "EVENT";
public static final String WILD_CARD = "*";
public static final String OPCODE = "org.jboss.soa.esb.actions.event.opcode";
public static final String EVENT_TYPE = "org.jboss.soa.esb.actions.event.type";
public static final String MESSAGE_IS_EVENT = "org.jboss.soa.esb.actions.event.message";
public EventManager ()
{
_events = new ConcurrentHashMap();
}
/**
* No real configuration options at this stage. Maybe consider sending
* triggers periodically rather than as soon as they occur? Maybe add a
* handler that gets called whenever a failure occurs on deliver?
*
* @param config
* @throws ConfigurationException
* @throws RegistryException
*/
public EventManager(ConfigTree config) throws ConfigurationException,
RegistryException
{
_events = new ConcurrentHashMap();
}
/**
* Initialise the action instance. This method is called after the
* action instance has been instantiated so that configuration options
* can be validated.
*
* @throws ActionLifecycleException
* for errors during initialisation.
*/
public void initialise () throws ActionLifecycleException
{
}
/**
* Destroy the action instance. This method is called prior to the
* release of the action instance. All resources associated with this
* action instance should be released as the instance will no longer be
* used.
*/
public void destroy () throws ActionLifecycleException
{
}
/**
* Processes an incoming message. If the message is a control type
* (subscribe or unsubscribe) then deal with it as such. Responses to
* these control messages will only be sent if there is a ReplyTo field
* set on the incoming message. Otherwise it is assumed that the sender
* does not want to be informed.
*
* If the message is anything other than that then we check to see if
* it's an event message. If it is, the action iterates over all of the
* registered endpoints for that event and sends the message on.
*
* @param message
* @return the original message
* @throws ActionProcessingException
*/
@SuppressWarnings("unchecked")
public Message process (Message message) throws ActionProcessingException
{
final String opcode = (String) message.getBody().get(OPCODE);
/*
* Ignore the message if it's not of the right format. In that case,
* maybe it's for another action in the chain.
*/
/*
* Ah, hand-crafted stub generation :-)
*/
if (opcode != null)
{
if (opcode.equals(SUBSCRIBE_MESSAGE))
subscribe(message);
else
{
if (opcode.equals(UNSUBSCRIBE_MESSAGE))
unsubscribe(message);
else
{
if (opcode.equals(EVENT_MESSAGE))
trigger(message);
}
}
}
return message;
}
private void subscribe (final Message message)
throws ActionProcessingFaultException
{
/*
* Pull out the EPR for event triggering.
*/
EPR toEpr = message.getHeader().getCall().getFrom();
if (toEpr == null)
toEpr = message.getHeader().getCall().getReplyTo();
if (toEpr == null)
{
logger.error("Event.subscribe - no From or ReplyTo EPR specified!");
throw new ActionProcessingFaultException(
"No From or ReplyTo EPR specified for subscribe!");
}
/*
* Pull out the list of events on which to be triggered. Each event is
* uniquely identified and placed in the body with an attribute name
* starting with EVENT_TYPE.
*/
String[] names = message.getBody().getNames();
for (int i = 0; i < names.length; i++)
{
/*
* Duplicates are allowed.
*/
if (names[i].startsWith(EVENT_TYPE))
{
EventMap theMap = null;
synchronized (_events)
{
theMap = _events.get((String) message.getBody().get(names[i]));
if (theMap == null)
{
theMap = new EventMap((String) message.getBody().get(names[i]));
_events.put((String) message.getBody().get(names[i]), theMap);
}
}
theMap.addEPR(toEpr);
}
}
}
private void unsubscribe (final Message message)
throws ActionProcessingFaultException
{
/*
* Pull out the EPR for event triggering.
*/
EPR toEpr = message.getHeader().getCall().getFrom();
if (toEpr == null)
toEpr = message.getHeader().getCall().getReplyTo();
if (toEpr == null)
{
logger
.error("Event.unsubscribe - no From or ReplyTo EPR specified!");
throw new ActionProcessingFaultException(
"No From or ReplyTo EPR specified for unsubscribe!");
}
/*
* Pull out the list of events on which to be triggered. Each event is
* uniquely identified and placed in the body with an attribute name
* starting with EVENT_TYPE.
*/
String[] names = message.getBody().getNames();
for (int i = 0; i < names.length; i++)
{
/*
* Duplicates are allowed.
*/
if (names[i].startsWith(EVENT_TYPE))
{
/*
* Since messages may be lost, assume unsubscribe from an empty
* event list is valid and refers to something that no longer
* exists.
*/
synchronized (_events)
{
EventMap theMap = _events.get(names[i]);
if (theMap != null)
{
theMap.removeEPR(toEpr);
if (theMap.size() == 0)
_events.remove(names[i]);
}
}
}
}
}
private void trigger (final Message message)
throws ActionProcessingFaultException
{
/*
* Get event name.
*/
final String eventName = (String) message.getBody().get(EVENT_TYPE);
if ((eventName == null) || (eventName.equals("")))
throw new ActionProcessingFaultException("No event name specified!");
final EventMap handler = _events.get(eventName);
final EventMap all = _events.get(WILD_CARD);
if ((handler != null) || (all != null))
{
/*
* Now see if we want to forward the message as-is, or create a
* new one.
*/
final String content = (String) message.getBody().get(MESSAGE_IS_EVENT);
Message event;
if ("true".equalsIgnoreCase(content))
event = message;
else
event = createMessage(message);
if (handler != null)
handler.trigger(event);
if (all != null)
all.trigger(event);
}
}
private Message createMessage (final Message message)
throws ActionProcessingFaultException
{
Message msg = MessageFactory.getInstance()
.getMessage(message.getType()); // make sure it's the same type!
msg.getBody().add(EVENT_TYPE, message.getBody().get(EVENT_TYPE));
Call call;
try
{
call = new Call(message.getHeader().getCall());
}
catch (URISyntaxException ex)
{
throw new ActionProcessingFaultException(
"Could not create header routing information for event message!");
}
msg.getHeader().setCall(call);
return msg;
}
private ConcurrentHashMap _events;
private final static Logger logger = Logger.getLogger(EventManager.class);
class EventMap
{
public EventMap(String name)
{
_name = name;
_endpoints = new ConcurrentLinkedQueue();
}
/**
* Send the trigger message to the list of registered recipients. This
* is NOT an atomic operation: any failures will be logged, but the
* trigger will continue.
*
* @param msg
* The message to send.
*/
public void trigger (final Message msg)
{
Iterator iter = _endpoints.iterator();
while (iter.hasNext())
{
TwoWayCourier courier = null;
ServiceInvoker service = null;
EPR to = iter.next();
try
{
/*
* Use ServiceInvoker if the EPR is a LogicalEPR.
*/
if (to instanceof LogicalEPR)
service = ((LogicalEPR) to).getServiceInvoker();
else
courier = CourierFactory.getInstance().getMessageCourier(to);
}
catch (MessageDeliverException ex)
{
logger.warn("Problem with LogicalEPR", ex);
}
catch (CourierException ex)
{
logger.warn("Courier exception", ex);
}
catch (MalformedEPRException ex)
{
logger.warn("getCourier problem", ex);
}
if ((courier == null) && (service == null))
{
logger.warn("Could not get Courier or ServiceInvoker for "
+ to);
}
else
{
msg.getHeader().getCall().setTo(to);
try
{
if (courier != null)
{
courier.deliver(msg);
CourierFactory.deregisterCourier(courier);
}
else
service.deliverAsync(msg);
}
catch (MessageDeliverException ex)
{
logger.warn("Problem delivering message through ServiceInvoker!", ex);
}
catch (CourierException ex)
{
logger.warn("Exception during deliver!", ex);
}
catch (MalformedEPRException ex)
{
logger.warn("Addressing problem during deliver!", ex);
}
}
}
}
public int size ()
{
return _endpoints.size();
}
public String name ()
{
return _name;
}
public void addEPR (EPR endpoint)
{
_endpoints.add(endpoint);
}
public void removeEPR (EPR endpoint)
{
_endpoints.remove(endpoint);
}
public String toString ()
{
return "Trigger name: " + _name + ", recipient: " + _endpoints;
}
private String _name;
private ConcurrentLinkedQueue _endpoints;
}
}