This example uses bean managed transactions and sends a response in the same transaction.
package ch.logobject.test.mdb;
import javax.annotation.EJB;
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.ejb.TransactionManagement;
import javax.ejb.TransactionManagementType;
import javax.jms.Message;
import javax.jms.MessageListener;
@MessageDriven(activateConfig= {
@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Topic"),
@ActivationConfigProperty(propertyName="destination", propertyValue="topic/test"),
@ActivationConfigProperty(propertyName="messageSelector", propertyValue="JMSType = 'request' AND myProperty = 'myValue'"),
@ActivationConfigProperty(propertyName="acknowledgeMode", propertyValue="AUTO_ACKNOWLEDGE"),
@ActivationConfigProperty(propertyName="subscriptionDurability", propertyValue="Durable"),
@ActivationConfigProperty(propertyName="subscriptionName", propertyValue="TestMdbSubscription"),
// always set a client ID if using durable subscriptions
@ActivationConfigProperty(propertyName="clientId", propertyValue="TestMdb")
})
@TransactionManagement(value=TransactionManagementType.BEAN)
public class TestMdb implements MessageListener {
@EJB
IMySessionBean bean;
@Resource
MessageDrivenContext mc;
@Resource(mappedName="java:/JmsXA")
ConnectionFactory txJmsCF;
// by using the HA-JNDI port this works in a cluster where the queue may be remote!
@Resource(mappedName="jnp://localhost:1100/queue/test");
Queue testQ;
private UserTransaction tx;
public void onMessage(Message msg) {
try {
tx = mc.getUserTransaction();
tx.begin();
bean.myBusinessMethod();
sendResponse(msg, "response", "hello world", 4, 30000L);
tx.commit();
} catch(MyBusinessException e) {
} catch(EJBTransactionRolledbackException e) {
Exception uncheckedException = e.getCausedByException();
} catch(EJBAccessException e) {
} catch(JMSException e) {
} catch(Exception e) {
e.printStackTrace();
}
}
private void sendResponse(Message orig, String type, String body, int priority, long ttl) throws JMSException {
Connection conn = txJmsCF.createConnection();
Session session = conn.createSession(true, 0);
TextMessage answer = session.createTextMessage();
answer.setJMSType(type);
answer.setText(body);
answer.setJMSCorrelationID(orig.getJMSMessageID());
MessageProducer sender = session.createProducer(orig.getJMSReplyTo());
sender.setDeliveryMode(DeliveryMode.PERSISTENT);
sender.setPriority(priority);
sender.setTimeToLive(ttl);
sender.send(answer);
sender.close();
session.close();
conn.close();
log.debug("response sent");
}
}
Comments