This example uses bean managed transactions and sends a response in the same transaction.
package ch.logobject.test.mdb; import javax.annotation.EJB; import javax.annotation.Resource; import javax.ejb.ActivationConfigProperty; import javax.ejb.MessageDriven; import javax.ejb.TransactionManagement; import javax.ejb.TransactionManagementType; import javax.jms.Message; import javax.jms.MessageListener; @MessageDriven(activateConfig= { @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Topic"), @ActivationConfigProperty(propertyName="destination", propertyValue="topic/test"), @ActivationConfigProperty(propertyName="messageSelector", propertyValue="JMSType = 'request' AND myProperty = 'myValue'"), @ActivationConfigProperty(propertyName="acknowledgeMode", propertyValue="AUTO_ACKNOWLEDGE"), @ActivationConfigProperty(propertyName="subscriptionDurability", propertyValue="Durable"), @ActivationConfigProperty(propertyName="subscriptionName", propertyValue="TestMdbSubscription"), // always set a client ID if using durable subscriptions @ActivationConfigProperty(propertyName="clientId", propertyValue="TestMdb") }) @TransactionManagement(value=TransactionManagementType.BEAN) public class TestMdb implements MessageListener { @EJB IMySessionBean bean; @Resource MessageDrivenContext mc; @Resource(mappedName="java:/JmsXA") ConnectionFactory txJmsCF; // by using the HA-JNDI port this works in a cluster where the queue may be remote! @Resource(mappedName="jnp://localhost:1100/queue/test"); Queue testQ; private UserTransaction tx; public void onMessage(Message msg) { try { tx = mc.getUserTransaction(); tx.begin(); bean.myBusinessMethod(); sendResponse(msg, "response", "hello world", 4, 30000L); tx.commit(); } catch(MyBusinessException e) { } catch(EJBTransactionRolledbackException e) { Exception uncheckedException = e.getCausedByException(); } catch(EJBAccessException e) { } catch(JMSException e) { } catch(Exception e) { e.printStackTrace(); } } private void sendResponse(Message orig, String type, String body, int priority, long ttl) throws JMSException { Connection conn = txJmsCF.createConnection(); Session session = conn.createSession(true, 0); TextMessage answer = session.createTextMessage(); answer.setJMSType(type); answer.setText(body); answer.setJMSCorrelationID(orig.getJMSMessageID()); MessageProducer sender = session.createProducer(orig.getJMSReplyTo()); sender.setDeliveryMode(DeliveryMode.PERSISTENT); sender.setPriority(priority); sender.setTimeToLive(ttl); sender.send(answer); sender.close(); session.close(); conn.close(); log.debug("response sent"); } }
Comments