Code
import java.util.ArrayList; import javax.ejb.EJBException; import javax.ejb.SessionBean; import javax.ejb.SessionContext; import javax.ejb.CreateException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Connection; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; /** * Helper sessionbean to receive JMS messages. * * @ejb.bean name="JmsReceiver" * type="Stateless" * display-name="Jms Receiver helpers" * jndi-name="ejb/JmsReceiver" * local-jndi-name="ejb/JmsReceiverLocal" * view-type="both" * * @ejb.transaction type = "Required" * * @ejb.util generate = "physical" * * @ejb.resource-ref res-ref-name = "jms/QueueConnectionFactory" * res-type = "javax.jms.QueueConnectionFactory" * res-auth = "Container" * @jboss.resource-ref res-ref-name = "jms/QueueConnectionFactory" * jndi-name = "java:/JmsXA" * */ public class JmsReceiverBean implements SessionBean { private static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(JmsReceiverBean.class.getName()); private SessionContext _ctx; private transient QueueConnectionFactory queueConnectionFactory; /** * Creates a new instance. * * @throws CreateException if the bean couldn't be created * @ejb.create-method */ public void ejbCreate() throws CreateException { try { init(); } catch (NamingException e) { throw new CreateException("Component not found [" + e.getMessage() + "]"); } } /** * Gets a batch of messages on the specified queue mapping the specified selector * @param queue the queue to connect to * @param selector only messages with properties matching the message selector * expression are delivered. A value of null or an empty string * indicates that there is no message selector for the message * consumer. * @param batchSize the maximum number of <code>Message</code>s to return. If queue size is <= 0, then * all messages matching the selector will be returned. * @return an array of max <code>batchSize javax.jms.Message</code> matching * the specified selector. An empty array is returned if there is no * (more) messages * @throws JMSException if an error occured while retrieving the messages * * @ejb.interface-method */ public Message[] receive(Queue queue, String selector, int batchSize) throws JMSException { QueueConnection connection = null; QueueSession session = null; QueueReceiver receiver = null; try { queueConnectionFactory.createQueueConnection(); session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); receiver = session.createReceiver(queue, selector); // As from 3.2.4, you need to start the connection to receive messages with java:/JmsXA connection.start(); // Comment the line above if you are running Jboss < 3.2.4 ArrayList result = new ArrayList(); Message msg = null; boolean unLimitedBatchSize = (batchSize <= 0); while ( ( (result.size() < batchSize) || unLimitedBatchSize) && ((msg = receiver.receiveNoWait()) != null)) { result.add(msg); } return (Message[]) result.toArray(new Message[result.size()]); } finally { closeReceiver(receiver); closeSession(session); closeConnection(connection); } } /** * Gets a batch of messages on the specified queue. * @param queue the queue to connect to * @param batchSize the maximum number of <code>Message</code>s to return. If batchSize <= 0, then * all messages will be returned. * @return an array of max <code>batchSize javax.jms.Message</code>. An * empty array is returned if there is no (more) messages * @throws JMSException if an error occured while retrieving the messages * * @ejb.interface-method */ public Message[] receive(Queue queue, int batchSize) throws JMSException { return receive(queue, null, batchSize); } /** * Gets the next message on the specified queue mapping the specified selector * @param queue the queue to connect to * @param selector only messages with properties matching the message selector * expression are delivered. A value of null or an empty string * indicates that there is no message selector for the message * consumer. * @return a <code>javax.jms.Message</code> matching the specified selector or * <code>null</code> if there is no (more) messages * @throws JMSException if an error occured while retrieving the message * * @ejb.interface-method */ public Message receive(Queue queue, String selector) throws JMSException { Message[] msg = receive(queue, selector, 1); if (msg.length > 0) { return msg[0]; } else { return null; } } /** * Gets the next message on the specified queue. * @param queue the queue to connect to * @return a <code>javax.jms.Message</code> or <code>null</code> if there * is no (more) messages * @throws JMSException if an error occured while retrieving the message * * @ejb.interface-method */ public Message receive(Queue queue) throws JMSException { return receive(queue, null); } public void ejbActivate() { try { init(); } catch (NamingException e) { throw new EJBException("Could not activate", e); } } public void ejbPassivate() { } public void ejbRemove() { } public void setSessionContext(SessionContext context) { this._ctx = context; } /** * Initializes the bean. */ private void init() throws NamingException { Context jndi = new InitialContext(); queueConnectionFactory = (QueueConnectionFactory) jndi.lookup("java:comp/env/jms/QueueConnectionFactory"); } /** * Closes the JMS connection. */ private void closeConnection(Connection connection) { try { if (connection != null) connection.close(); } catch (JMSException e) { logger.warn("Could not close JMS connection", e); } } /** * Closes the JMS session. */ private void closeSession(Session session) { try { if (session != null) session.close(); } catch (JMSException e) { logger.warn("Could not close JMS session", e); } } /** * Closes the JMS session. */ private void closeReceiver(QueueReceiver queueReceiver) { try { if (queueReceiver != null) queueReceiver.close(); } catch (JMSException e) { logger.warn("Could not close queue receiver", e); } } }
Comments