Message creator interface
This interface allows client to build JMS message easily
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
public interface MessageCreator extends Serializable {
public Message getMessage(Session session) throws JMSException;
}
Code
import java.io.Serializable;
import java.util.Date;
import javax.ejb.EJBException;
import javax.ejb.SessionBean;
import javax.ejb.SessionContext;
import javax.ejb.CreateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.Connection;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* Helper sessionbean to send JMS messages.
*
* @ejb.bean name="JmsSender"
* type="Stateless"
* display-name="Jms Sender helpers"
* jndi-name="ejb/JmsSender"
* local-jndi-name="ejb/JmsSenderLocal"
* view-type="both"
*
* @ejb.transaction type = "Required"
*
* @ejb.util generate = "physical"
*
* @ejb.resource-ref res-ref-name = "jms/QueueConnectionFactory"
* res-type = "javax.jms.QueueConnectionFactory"
* res-auth = "Container"
* @jboss.resource-ref res-ref-name = "jms/QueueConnectionFactory"
* jndi-name = "java:/JmsXA"
* @ejb.resource-ref res-ref-name = "jms/TopicConnectionFactory"
* res-type = "javax.jms.TopicConnectionFactory"
* res-auth = "Container"
* @jboss.resource-ref res-ref-name = "jms/TopicConnectionFactory"
* jndi-name = "java:/JmsXA"
*
*/
public class JmsSenderBean implements SessionBean {
private static org.apache.log4j.Logger logger =
org.apache.log4j.Logger.getLogger(JmsSenderBean.class.getName());
private SessionContext _ctx;
private transient QueueConnectionFactory queueConnectionFactory;
private transient TopicConnectionFactory topicConnectionFactory;
/**
* Creates a new instance.
*
* @throws javax.ejb.CreateException if the bean couldn't be created
* @ejb.create-method
*/
public void ejbCreate() throws CreateException {
try {
init();
} catch (NamingException e) {
throw new CreateException("Component not found [" + e.getMessage() + "]");
}
}
/**
* Publish a message on a topic.
*
* @ejb.interface-method
* @param messageCreator MessageCreator to create the message with.
* @param topic The topic to publish on.
*/
public void publish(MessageCreator messageCreator, Topic topic) throws JMSException {
if (logger.isDebugEnabled()) {
logger.debug("Publishing message [" + messageCreator + "] on topic [" + topic.getTopicName() + "]");
}
TopicConnection connection = null;
TopicSession session = null;
TopicPublisher publisher = null;
try {
connection = openTopicConnection();
session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE);
publisher = session.createPublisher(topic);
publisher.publish(messageCreator.getMessage(session));
} finally {
closeTopicPublisher(publisher);
closeSession(session);
closeConnection(connection);
}
}
/**
* Publish text on a topic.
*
* @ejb.interface-method
* @param message The message to publish.
* @param topic The topic to publish on.
*/
public void publish(String message, Topic topic) throws JMSException {
if (logger.isDebugEnabled()) {
logger.debug("Publishing text [" + message + "] on topic [" + topic.getTopicName() + "]");
}
publish(new TextMessageCreator(message), topic);
}
/**
* Publish an object on a topic.
*
* @ejb.interface-method
* @param message The message to publish.
* @param topic The topic to publish on.
*/
public void publish(Object message, Topic topic) throws JMSException {
if (logger.isDebugEnabled()) {
logger.debug("Publishing object [" + message + "] on topic [" + topic.getTopicName() + "]");
}
publish(new ObjectMessageCreator(message), topic);
}
/**
* Send a Message to a queue with a scheduled delivery and returns the ID assigned by
* the JMS provider.
*
* @ejb.interface-method
* @param messageCreator MessageCreator to create the message with.
* @param queue The queue to send to.
* @param deliveryDate the date at which the message should be delivered. If null, no delivery date is set
* @return the JMSMessageID asssigned to this message.
*/
public String send(MessageCreator messageCreator, Queue queue, Date deliveryDate) throws JMSException {
if (logger.isDebugEnabled()) {
logger.debug("Sending message [" + messageCreator + "] to queue [" + queue.getQueueName() + "]");
}
QueueConnection connection = null;
QueueSession session = null;
QueueSender sender = null;
try {
connection = openQueueConnection();
session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
sender = session.createSender(queue);
Message msg = messageCreator.getMessage(session);
if (deliveryDate != null) {
logger.debug("Message will be delivered on ["+deliveryDate+"]");
msg.setLongProperty("JMS_JBOSS_SCHEDULED_DELIVERY", deliveryDate.getTime());
}
sender.send(msg);
return msg.getJMSMessageID();
} finally {
closeQueueSender(sender);
closeSession(session);
closeConnection(connection);
}
}
/**
* Send a Message to a queue and returns the ID assigned by the JMS provider.
*
* @ejb.interface-method
* @param messageCreator MessageCreator to create the message with.
* @param queue The queue to send to.
* @return the JMSMessageID asssigned to this message.
*/
public String send(MessageCreator messageCreator, Queue queue) throws JMSException {
return send(messageCreator, queue, null);
}
/**
* Send text to a queue and returns the ID assigned by the JMS provider.
*
* @ejb.interface-method
* @param message The message to publish.
* @param queue The queue to send to.
* @return the JMSMessageID asssigned to this message.
*/
public String send(String message, Queue queue) throws JMSException {
return send(new TextMessageCreator(message), queue);
}
/**
* Send an Object to a queue and returns its ID assigned by the JMS provider.
*
* @ejb.interface-method
* @param message The message to publish.
* @param queue The queue to send to.
* @return the JMSMessageID asssigned to this message.
*/
public String send(Object message, Queue queue) throws JMSException {
return send(new ObjectMessageCreator(message), queue);
}
public void ejbActivate() {
try {
init();
} catch (NamingException e) {
throw new EJBException("Could not activate", e);
}
}
public void ejbPassivate() {
}
public void ejbRemove() {
}
public void setSessionContext(SessionContext context) {
this._ctx = context;
}
/**
* Initializes the bean.
*/
private void init() throws NamingException {
Context jndi = new InitialContext();
queueConnectionFactory = (QueueConnectionFactory) jndi.lookup("java:comp/env/jms/QueueConnectionFactory");
topicConnectionFactory = (TopicConnectionFactory) jndi.lookup("java:comp/env/jms/TopicConnectionFactory");
}
/**
* Retrieves a new JMS Connection from the pool
* @return a <code>QueueConnection</code>
* @throws JMSException if the connection could not be retrieved
*/
private QueueConnection openQueueConnection() throws JMSException {
return queueConnectionFactory.createQueueConnection();
// queueConnection.start(); this is a pool we don't need to start the connection
}
/**
* Retrieves a new JMS Connection from the pool
* @return a <code>QueueConnection</code>
* @throws JMSException if the connection could not be retrieved
*/
private TopicConnection openTopicConnection() throws JMSException {
return topicConnectionFactory.createTopicConnection();
}
/**
* Closes the JMS connection.
*/
private void closeConnection(Connection connection) {
try {
if (connection != null)
connection.close();
} catch (JMSException e) {
logger.warn("Could not close JMS connection", e);
}
}
/**
* Closes the JMS session.
*/
private void closeSession(Session session) {
try {
if (session != null)
session.close();
} catch (JMSException e) {
logger.warn("Could not close JMS session", e);
}
}
/**
* Closes the JMS session.
*/
private void closeQueueSender(QueueSender queueSender) {
try {
if (queueSender!= null)
queueSender.close();
} catch (JMSException e) {
logger.warn("Could not close queue sender", e);
}
}
/**
* Closes the JMS session.
*/
private void closeTopicPublisher(TopicPublisher topicPublisher) {
try {
if (topicPublisher != null)
topicPublisher.close();
} catch (JMSException e) {
logger.warn("Could not close queue sender", e);
}
}
private class TextMessageCreator implements MessageCreator {
private String message;
public TextMessageCreator(String message) {
this.message = message;
}
public Message getMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
public String toString() {
return message;
}
}
private class ObjectMessageCreator implements MessageCreator {
private Serializable message;
public ObjectMessageCreator(Object message) throws JMSException {
if (!(message instanceof Serializable)) {
throw new JMSException("Object ["+message+"] is not serializable");
}
this.message = (Serializable) message;
}
public Message getMessage(Session session) throws JMSException {
return session.createObjectMessage(message);
}
public String toString() {
if (message != null)
return message.toString();
else
return "null";
}
}
}
Comments