XA transaction
relgames Jul 6, 2007 1:28 PMHi!
We have confronted with strange issue.
We have :
1) MDB1 that send JMS messages
2) MDB2 that receives message
Scenario is:
1. MDB1 persists entity and sends JMS message with ID of entity persisted
2. MDB2 received message and calls EntityManager.find() with id received.
Some times step 2 fails: entity manager doesn't contains entity!
It seems that MDB receives message earlier than entity manager fully persists entity.
We use JmsXA as Connection Factory.
Here is the code:
// MDB 1 public void onMessage(Message message) { try { ... some code... Domain d = new Domain("some name"); manager.persist(d); list.add(d); .... some code MessageHelper.sendMessages(list, new String[]{ Constants.JmsQueueNames.DOMAIN_OWNER_APPROVER, Constants.JmsQueueNames.TRADEMARK_APPROVER, Constants.JmsQueueNames.ADULT_APPROVER}); } catch (JMSException e) { log.error(e); throw new EJBException(e); } log.debug("finished"); } /// MDB 2 public void onMessage(Message message) { .. some code ... Domain domain = MessageHelper.getObject(message, manager, Domain.class); .. some code ... } public class MessageHelper { private static Log log = LogFactory.getLog(MessageHelper.class); private static final String ID_KEY = "ENTITY_ID"; public static <T> T getObject(Message message, EntityManager em, Class<T> objClass) throws JMSException { Long pk = message.getLongProperty(ID_KEY); if (pk==null) { StringBuffer str = new StringBuffer(); str.append("PK is null!\n"); Enumeration props = message.getPropertyNames(); while (props.hasMoreElements()) { String prop = (String)props.nextElement(); str.append(prop).append("=").append(message.getObjectProperty(prop).toString()).append('\n'); } log.warn(str.toString()); } T obj = em.find(objClass, pk); if (obj==null) { log.warn("Object is null!, pk = " + pk); } return obj; } public static void sendMessage(Long objectId, String queue) { List<Long> list = new ArrayList<Long>(); list.add(objectId); sendMessages(list, new String[]{queue}); } public static void sendMessages(List<Long> objectIds, String[] queues) { Connection connection = null; Session session = null; try { connection = getConnectionFactory().createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); List<Message> messages = new ArrayList<Message>(); for (Long id : objectIds) { Message message = session.createMessage(); message.setLongProperty(ID_KEY, id); messages.add(message); } sendJMSMessages(queues, session, messages); } catch (JMSException ex) { log.error(ex); throw new EJBException("error sending message", ex); } catch (NamingException e) { log.error(e); throw new EJBException("error sending message", e); } finally { closeSession(session); closeConnection(connection); } } private static void sendListObjectMessage(List objects, String[] queues) { Connection connection = null; Session session = null; try { connection = getConnectionFactory().createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); List<Message> messages = new ArrayList<Message>(); for (Object object : objects) { ObjectMessage message = session.createObjectMessage(); message.setObject((Serializable) object); messages.add(message); } sendJMSMessages(queues, session, messages); } catch (JMSException ex) { log.error(ex); throw new EJBException("error sending message", ex); } catch (NamingException e) { log.error(e); throw new EJBException("error sending message", e); } finally { closeSession(session); closeConnection(connection); } } private static void sendJMSMessages(String[] queues, Session session, List<Message> messages) throws JMSException, NamingException { MessageProducer producer = session.createProducer(null); try { for (String q : queues) { Destination dest = getQueue(q); for (Message message : messages) { producer.send(dest, message); } } } finally { if (producer!=null) { producer.close(); } } } private static void sendObjectMessage(Serializable object, String[] queues) { ArrayList<Serializable> list = new ArrayList<Serializable>(); list.add(object); sendListObjectMessage(list, queues); } public static void sendObjectMessage(Serializable object, String queue) { sendObjectMessage(object, new String[]{queue}); } private static Queue getQueue(String name) throws NamingException { return (Queue) getContext().lookup(name); } private static Context _ctx; /** * http://wiki.jboss.org/wiki/Wiki.jsp?page=JBossHAJNDIUseCluster */ private static Context getContext() throws NamingException { if (null == _ctx) { /* String partitionName = System.getProperty("jboss.partition.name"); if (null != partitionName && partitionName.length() > 0) { Properties p = new Properties(); p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory"); p.put(Context.URL_PKG_PREFIXES, "jboss.naming:org.jnp.interfaces"); p.put("jnp.partitionName", partitionName); _ctx = new InitialContext(p); } else { log.warn("can't find cluster partition name"); _ctx = new InitialContext(); } */ _ctx = new InitialContext(); } return _ctx; } /** * Closes the JMS connection. */ private static void closeConnection(Connection connection) { try { if (connection != null) connection.close(); } catch (JMSException e) { log.warn("Could not close JMS connection", e); } } /** * Closes the JMS session. */ private static void closeSession(Session session) { try { if (session != null) session.close(); } catch (JMSException e) { log.warn("Could not close JMS session", e); } } private static ConnectionFactory getConnectionFactory() throws NamingException { return (ConnectionFactory) getContext().lookup(Constants.JMS_CONNECTION_FACTORY_NAME); } public static void sendMessage(Long id, QueueConnectionFactory factory, String queue) { sendMessage(id, queue); } public static void sendMessages(List<Long> list, QueueConnectionFactory factory, String[] strings) { sendMessages(list, strings); } public static void sendObjectMessage(Serializable obj, QueueConnectionFactory factory, String queue) { sendObjectMessage(obj, queue); }
debug log is very big, but i've putted it here: http://tvpayrev.belhard.com/feeds/t.txt
you can see that MDB1 finished (first line), and last line informs us that MDB2 can't load entity.