XA transaction
relgames Jul 6, 2007 1:28 PMHi!
We have confronted with strange issue.
We have :
1) MDB1 that send JMS messages
2) MDB2 that receives message
Scenario is:
1. MDB1 persists entity and sends JMS message with ID of entity persisted
2. MDB2 received message and calls EntityManager.find() with id received.
Some times step 2 fails: entity manager doesn't contains entity!
It seems that MDB receives message earlier than entity manager fully persists entity.
We use JmsXA as Connection Factory.
Here is the code:
 // MDB 1
 public void onMessage(Message message) {
 try {
 ... some code...
 Domain d = new Domain("some name");
 manager.persist(d);
 list.add(d);
 .... some code
 MessageHelper.sendMessages(list, new String[]{
 Constants.JmsQueueNames.DOMAIN_OWNER_APPROVER,
 Constants.JmsQueueNames.TRADEMARK_APPROVER,
 Constants.JmsQueueNames.ADULT_APPROVER});
 } catch (JMSException e) {
 log.error(e);
 throw new EJBException(e);
 }
 log.debug("finished");
 }
/// MDB 2
 public void onMessage(Message message) {
 .. some code ...
 Domain domain = MessageHelper.getObject(message, manager, Domain.class);
 .. some code ...
}
public class MessageHelper {
 private static Log log = LogFactory.getLog(MessageHelper.class);
 private static final String ID_KEY = "ENTITY_ID";
 public static <T> T getObject(Message message, EntityManager em, Class<T> objClass) throws JMSException {
 Long pk = message.getLongProperty(ID_KEY);
 if (pk==null) {
 StringBuffer str = new StringBuffer();
 str.append("PK is null!\n");
 Enumeration props = message.getPropertyNames();
 while (props.hasMoreElements()) {
 String prop = (String)props.nextElement();
 str.append(prop).append("=").append(message.getObjectProperty(prop).toString()).append('\n');
 }
 log.warn(str.toString());
 }
 T obj = em.find(objClass, pk);
 if (obj==null) {
 log.warn("Object is null!, pk = " + pk);
 }
 return obj;
 }
 public static void sendMessage(Long objectId, String queue) {
 List<Long> list = new ArrayList<Long>();
 list.add(objectId);
 sendMessages(list, new String[]{queue});
 }
 public static void sendMessages(List<Long> objectIds, String[] queues) {
 Connection connection = null;
 Session session = null;
 try {
 connection = getConnectionFactory().createConnection();
 connection.start();
 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 List<Message> messages = new ArrayList<Message>();
 for (Long id : objectIds) {
 Message message = session.createMessage();
 message.setLongProperty(ID_KEY, id);
 messages.add(message);
 }
 sendJMSMessages(queues, session, messages);
 } catch (JMSException ex) {
 log.error(ex);
 throw new EJBException("error sending message", ex);
 } catch (NamingException e) {
 log.error(e);
 throw new EJBException("error sending message", e);
 } finally {
 closeSession(session);
 closeConnection(connection);
 }
 }
 private static void sendListObjectMessage(List objects, String[] queues) {
 Connection connection = null;
 Session session = null;
 try {
 connection = getConnectionFactory().createConnection();
 connection.start();
 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 List<Message> messages = new ArrayList<Message>();
 for (Object object : objects) {
 ObjectMessage message = session.createObjectMessage();
 message.setObject((Serializable) object);
 messages.add(message);
 }
 sendJMSMessages(queues, session, messages);
 } catch (JMSException ex) {
 log.error(ex);
 throw new EJBException("error sending message", ex);
 } catch (NamingException e) {
 log.error(e);
 throw new EJBException("error sending message", e);
 } finally {
 closeSession(session);
 closeConnection(connection);
 }
 }
 private static void sendJMSMessages(String[] queues, Session session, List<Message> messages) throws JMSException, NamingException {
 MessageProducer producer = session.createProducer(null);
 try {
 for (String q : queues) {
 Destination dest = getQueue(q);
 for (Message message : messages) {
 producer.send(dest, message);
 }
 }
 } finally {
 if (producer!=null) {
 producer.close();
 }
 }
 }
 private static void sendObjectMessage(Serializable object, String[] queues) {
 ArrayList<Serializable> list = new ArrayList<Serializable>();
 list.add(object);
 sendListObjectMessage(list, queues);
 }
 public static void sendObjectMessage(Serializable object, String queue) {
 sendObjectMessage(object, new String[]{queue});
 }
 private static Queue getQueue(String name) throws NamingException {
 return (Queue) getContext().lookup(name);
 }
 private static Context _ctx;
 /**
 * http://wiki.jboss.org/wiki/Wiki.jsp?page=JBossHAJNDIUseCluster
 */
 private static Context getContext() throws NamingException {
 if (null == _ctx) {
 /*
 String partitionName = System.getProperty("jboss.partition.name");
 if (null != partitionName && partitionName.length() > 0) {
 Properties p = new Properties();
 p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
 p.put(Context.URL_PKG_PREFIXES, "jboss.naming:org.jnp.interfaces");
 p.put("jnp.partitionName", partitionName);
 _ctx = new InitialContext(p);
 } else {
 log.warn("can't find cluster partition name");
 _ctx = new InitialContext();
 }
 */
 _ctx = new InitialContext();
 }
 return _ctx;
 }
/**
 * Closes the JMS connection.
 */
 private static void closeConnection(Connection connection) {
 try {
 if (connection != null)
 connection.close();
 } catch (JMSException e) {
 log.warn("Could not close JMS connection", e);
 }
 }
 /**
 * Closes the JMS session.
 */
 private static void closeSession(Session session) {
 try {
 if (session != null)
 session.close();
 } catch (JMSException e) {
 log.warn("Could not close JMS session", e);
 }
 }
 private static ConnectionFactory getConnectionFactory() throws NamingException {
 return (ConnectionFactory) getContext().lookup(Constants.JMS_CONNECTION_FACTORY_NAME);
 }
 public static void sendMessage(Long id, QueueConnectionFactory factory, String queue) {
 sendMessage(id, queue);
 }
 public static void sendMessages(List<Long> list, QueueConnectionFactory factory, String[] strings) {
 sendMessages(list, strings);
 }
 public static void sendObjectMessage(Serializable obj, QueueConnectionFactory factory, String queue) {
 sendObjectMessage(obj, queue);
 }
debug log is very big, but i've putted it here: http://tvpayrev.belhard.com/feeds/t.txt
you can see that MDB1 finished (first line), and last line informs us that MDB2 can't load entity.
 
    