Code
import java.util.ArrayList;
import javax.ejb.EJBException;
import javax.ejb.SessionBean;
import javax.ejb.SessionContext;
import javax.ejb.CreateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Connection;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
/**
* Helper sessionbean to receive JMS messages.
*
* @ejb.bean name="JmsReceiver"
* type="Stateless"
* display-name="Jms Receiver helpers"
* jndi-name="ejb/JmsReceiver"
* local-jndi-name="ejb/JmsReceiverLocal"
* view-type="both"
*
* @ejb.transaction type = "Required"
*
* @ejb.util generate = "physical"
*
* @ejb.resource-ref res-ref-name = "jms/QueueConnectionFactory"
* res-type = "javax.jms.QueueConnectionFactory"
* res-auth = "Container"
* @jboss.resource-ref res-ref-name = "jms/QueueConnectionFactory"
* jndi-name = "java:/JmsXA"
*
*/
public class JmsReceiverBean implements SessionBean {
private static org.apache.log4j.Logger logger =
org.apache.log4j.Logger.getLogger(JmsReceiverBean.class.getName());
private SessionContext _ctx;
private transient QueueConnectionFactory queueConnectionFactory;
/**
* Creates a new instance.
*
* @throws CreateException if the bean couldn't be created
* @ejb.create-method
*/
public void ejbCreate() throws CreateException {
try {
init();
} catch (NamingException e) {
throw new CreateException("Component not found [" + e.getMessage() + "]");
}
}
/**
* Gets a batch of messages on the specified queue mapping the specified selector
* @param queue the queue to connect to
* @param selector only messages with properties matching the message selector
* expression are delivered. A value of null or an empty string
* indicates that there is no message selector for the message
* consumer.
* @param batchSize the maximum number of <code>Message</code>s to return. If queue size is <= 0, then
* all messages matching the selector will be returned.
* @return an array of max <code>batchSize javax.jms.Message</code> matching
* the specified selector. An empty array is returned if there is no
* (more) messages
* @throws JMSException if an error occured while retrieving the messages
*
* @ejb.interface-method
*/
public Message[] receive(Queue queue, String selector, int batchSize) throws JMSException {
QueueConnection connection = null;
QueueSession session = null;
QueueReceiver receiver = null;
try {
queueConnectionFactory.createQueueConnection();
session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
receiver = session.createReceiver(queue, selector);
// As from 3.2.4, you need to start the connection to receive messages with java:/JmsXA
connection.start();
// Comment the line above if you are running Jboss < 3.2.4
ArrayList result = new ArrayList();
Message msg = null;
boolean unLimitedBatchSize = (batchSize <= 0);
while ( ( (result.size() < batchSize) || unLimitedBatchSize) && ((msg = receiver.receiveNoWait()) != null)) {
result.add(msg);
}
return (Message[]) result.toArray(new Message[result.size()]);
} finally {
closeReceiver(receiver);
closeSession(session);
closeConnection(connection);
}
}
/**
* Gets a batch of messages on the specified queue.
* @param queue the queue to connect to
* @param batchSize the maximum number of <code>Message</code>s to return. If batchSize <= 0, then
* all messages will be returned.
* @return an array of max <code>batchSize javax.jms.Message</code>. An
* empty array is returned if there is no (more) messages
* @throws JMSException if an error occured while retrieving the messages
*
* @ejb.interface-method
*/
public Message[] receive(Queue queue, int batchSize) throws JMSException {
return receive(queue, null, batchSize);
}
/**
* Gets the next message on the specified queue mapping the specified selector
* @param queue the queue to connect to
* @param selector only messages with properties matching the message selector
* expression are delivered. A value of null or an empty string
* indicates that there is no message selector for the message
* consumer.
* @return a <code>javax.jms.Message</code> matching the specified selector or
* <code>null</code> if there is no (more) messages
* @throws JMSException if an error occured while retrieving the message
*
* @ejb.interface-method
*/
public Message receive(Queue queue, String selector) throws JMSException {
Message[] msg = receive(queue, selector, 1);
if (msg.length > 0) {
return msg[0];
} else {
return null;
}
}
/**
* Gets the next message on the specified queue.
* @param queue the queue to connect to
* @return a <code>javax.jms.Message</code> or <code>null</code> if there
* is no (more) messages
* @throws JMSException if an error occured while retrieving the message
*
* @ejb.interface-method
*/
public Message receive(Queue queue) throws JMSException {
return receive(queue, null);
}
public void ejbActivate() {
try {
init();
} catch (NamingException e) {
throw new EJBException("Could not activate", e);
}
}
public void ejbPassivate() {
}
public void ejbRemove() {
}
public void setSessionContext(SessionContext context) {
this._ctx = context;
}
/**
* Initializes the bean.
*/
private void init() throws NamingException {
Context jndi = new InitialContext();
queueConnectionFactory = (QueueConnectionFactory) jndi.lookup("java:comp/env/jms/QueueConnectionFactory");
}
/**
* Closes the JMS connection.
*/
private void closeConnection(Connection connection) {
try {
if (connection != null)
connection.close();
} catch (JMSException e) {
logger.warn("Could not close JMS connection", e);
}
}
/**
* Closes the JMS session.
*/
private void closeSession(Session session) {
try {
if (session != null)
session.close();
} catch (JMSException e) {
logger.warn("Could not close JMS session", e);
}
}
/**
* Closes the JMS session.
*/
private void closeReceiver(QueueReceiver queueReceiver) {
try {
if (queueReceiver != null)
queueReceiver.close();
} catch (JMSException e) {
logger.warn("Could not close queue receiver", e);
}
}
}
Comments