Message creator interface
This interface allows client to build JMS message easily
import java.io.Serializable; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; public interface MessageCreator extends Serializable { public Message getMessage(Session session) throws JMSException; }
Code
import java.io.Serializable; import java.util.Date; import javax.ejb.EJBException; import javax.ejb.SessionBean; import javax.ejb.SessionContext; import javax.ejb.CreateException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import javax.jms.Connection; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; /** * Helper sessionbean to send JMS messages. * * @ejb.bean name="JmsSender" * type="Stateless" * display-name="Jms Sender helpers" * jndi-name="ejb/JmsSender" * local-jndi-name="ejb/JmsSenderLocal" * view-type="both" * * @ejb.transaction type = "Required" * * @ejb.util generate = "physical" * * @ejb.resource-ref res-ref-name = "jms/QueueConnectionFactory" * res-type = "javax.jms.QueueConnectionFactory" * res-auth = "Container" * @jboss.resource-ref res-ref-name = "jms/QueueConnectionFactory" * jndi-name = "java:/JmsXA" * @ejb.resource-ref res-ref-name = "jms/TopicConnectionFactory" * res-type = "javax.jms.TopicConnectionFactory" * res-auth = "Container" * @jboss.resource-ref res-ref-name = "jms/TopicConnectionFactory" * jndi-name = "java:/JmsXA" * */ public class JmsSenderBean implements SessionBean { private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(JmsSenderBean.class.getName()); private SessionContext _ctx; private transient QueueConnectionFactory queueConnectionFactory; private transient TopicConnectionFactory topicConnectionFactory; /** * Creates a new instance. * * @throws javax.ejb.CreateException if the bean couldn't be created * @ejb.create-method */ public void ejbCreate() throws CreateException { try { init(); } catch (NamingException e) { throw new CreateException("Component not found [" + e.getMessage() + "]"); } } /** * Publish a message on a topic. * * @ejb.interface-method * @param messageCreator MessageCreator to create the message with. * @param topic The topic to publish on. */ public void publish(MessageCreator messageCreator, Topic topic) throws JMSException { if (logger.isDebugEnabled()) { logger.debug("Publishing message [" + messageCreator + "] on topic [" + topic.getTopicName() + "]"); } TopicConnection connection = null; TopicSession session = null; TopicPublisher publisher = null; try { connection = openTopicConnection(); session = connection.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); publisher = session.createPublisher(topic); publisher.publish(messageCreator.getMessage(session)); } finally { closeTopicPublisher(publisher); closeSession(session); closeConnection(connection); } } /** * Publish text on a topic. * * @ejb.interface-method * @param message The message to publish. * @param topic The topic to publish on. */ public void publish(String message, Topic topic) throws JMSException { if (logger.isDebugEnabled()) { logger.debug("Publishing text [" + message + "] on topic [" + topic.getTopicName() + "]"); } publish(new TextMessageCreator(message), topic); } /** * Publish an object on a topic. * * @ejb.interface-method * @param message The message to publish. * @param topic The topic to publish on. */ public void publish(Object message, Topic topic) throws JMSException { if (logger.isDebugEnabled()) { logger.debug("Publishing object [" + message + "] on topic [" + topic.getTopicName() + "]"); } publish(new ObjectMessageCreator(message), topic); } /** * Send a Message to a queue with a scheduled delivery and returns the ID assigned by * the JMS provider. * * @ejb.interface-method * @param messageCreator MessageCreator to create the message with. * @param queue The queue to send to. * @param deliveryDate the date at which the message should be delivered. If null, no delivery date is set * @return the JMSMessageID asssigned to this message. */ public String send(MessageCreator messageCreator, Queue queue, Date deliveryDate) throws JMSException { if (logger.isDebugEnabled()) { logger.debug("Sending message [" + messageCreator + "] to queue [" + queue.getQueueName() + "]"); } QueueConnection connection = null; QueueSession session = null; QueueSender sender = null; try { connection = openQueueConnection(); session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); sender = session.createSender(queue); Message msg = messageCreator.getMessage(session); if (deliveryDate != null) { logger.debug("Message will be delivered on ["+deliveryDate+"]"); msg.setLongProperty("JMS_JBOSS_SCHEDULED_DELIVERY", deliveryDate.getTime()); } sender.send(msg); return msg.getJMSMessageID(); } finally { closeQueueSender(sender); closeSession(session); closeConnection(connection); } } /** * Send a Message to a queue and returns the ID assigned by the JMS provider. * * @ejb.interface-method * @param messageCreator MessageCreator to create the message with. * @param queue The queue to send to. * @return the JMSMessageID asssigned to this message. */ public String send(MessageCreator messageCreator, Queue queue) throws JMSException { return send(messageCreator, queue, null); } /** * Send text to a queue and returns the ID assigned by the JMS provider. * * @ejb.interface-method * @param message The message to publish. * @param queue The queue to send to. * @return the JMSMessageID asssigned to this message. */ public String send(String message, Queue queue) throws JMSException { return send(new TextMessageCreator(message), queue); } /** * Send an Object to a queue and returns its ID assigned by the JMS provider. * * @ejb.interface-method * @param message The message to publish. * @param queue The queue to send to. * @return the JMSMessageID asssigned to this message. */ public String send(Object message, Queue queue) throws JMSException { return send(new ObjectMessageCreator(message), queue); } public void ejbActivate() { try { init(); } catch (NamingException e) { throw new EJBException("Could not activate", e); } } public void ejbPassivate() { } public void ejbRemove() { } public void setSessionContext(SessionContext context) { this._ctx = context; } /** * Initializes the bean. */ private void init() throws NamingException { Context jndi = new InitialContext(); queueConnectionFactory = (QueueConnectionFactory) jndi.lookup("java:comp/env/jms/QueueConnectionFactory"); topicConnectionFactory = (TopicConnectionFactory) jndi.lookup("java:comp/env/jms/TopicConnectionFactory"); } /** * Retrieves a new JMS Connection from the pool * @return a <code>QueueConnection</code> * @throws JMSException if the connection could not be retrieved */ private QueueConnection openQueueConnection() throws JMSException { return queueConnectionFactory.createQueueConnection(); // queueConnection.start(); this is a pool we don't need to start the connection } /** * Retrieves a new JMS Connection from the pool * @return a <code>QueueConnection</code> * @throws JMSException if the connection could not be retrieved */ private TopicConnection openTopicConnection() throws JMSException { return topicConnectionFactory.createTopicConnection(); } /** * Closes the JMS connection. */ private void closeConnection(Connection connection) { try { if (connection != null) connection.close(); } catch (JMSException e) { logger.warn("Could not close JMS connection", e); } } /** * Closes the JMS session. */ private void closeSession(Session session) { try { if (session != null) session.close(); } catch (JMSException e) { logger.warn("Could not close JMS session", e); } } /** * Closes the JMS session. */ private void closeQueueSender(QueueSender queueSender) { try { if (queueSender!= null) queueSender.close(); } catch (JMSException e) { logger.warn("Could not close queue sender", e); } } /** * Closes the JMS session. */ private void closeTopicPublisher(TopicPublisher topicPublisher) { try { if (topicPublisher != null) topicPublisher.close(); } catch (JMSException e) { logger.warn("Could not close queue sender", e); } } private class TextMessageCreator implements MessageCreator { private String message; public TextMessageCreator(String message) { this.message = message; } public Message getMessage(Session session) throws JMSException { return session.createTextMessage(message); } public String toString() { return message; } } private class ObjectMessageCreator implements MessageCreator { private Serializable message; public ObjectMessageCreator(Object message) throws JMSException { if (!(message instanceof Serializable)) { throw new JMSException("Object ["+message+"] is not serializable"); } this.message = (Serializable) message; } public Message getMessage(Session session) throws JMSException { return session.createObjectMessage(message); } public String toString() { if (message != null) return message.toString(); else return "null"; } } }
Comments