2 Replies Latest reply on Jul 6, 2007 1:39 PM by Tim Fox

    XA transaction

    Oleg Poleshuk Newbie

      Hi!

      We have confronted with strange issue.

      We have :
      1) MDB1 that send JMS messages
      2) MDB2 that receives message

      Scenario is:
      1. MDB1 persists entity and sends JMS message with ID of entity persisted
      2. MDB2 received message and calls EntityManager.find() with id received.

      Some times step 2 fails: entity manager doesn't contains entity!
      It seems that MDB receives message earlier than entity manager fully persists entity.

      We use JmsXA as Connection Factory.

      Here is the code:

       // MDB 1
       public void onMessage(Message message) {
       try {
      
       ... some code...
      
       Domain d = new Domain("some name");
       manager.persist(d);
      
       list.add(d);
      
       .... some code
      
       MessageHelper.sendMessages(list, new String[]{
       Constants.JmsQueueNames.DOMAIN_OWNER_APPROVER,
       Constants.JmsQueueNames.TRADEMARK_APPROVER,
       Constants.JmsQueueNames.ADULT_APPROVER});
       } catch (JMSException e) {
       log.error(e);
       throw new EJBException(e);
       }
       log.debug("finished");
       }
      
      /// MDB 2
       public void onMessage(Message message) {
       .. some code ...
      
       Domain domain = MessageHelper.getObject(message, manager, Domain.class);
      
       .. some code ...
      
      }
      
      
      public class MessageHelper {
      
       private static Log log = LogFactory.getLog(MessageHelper.class);
      
       private static final String ID_KEY = "ENTITY_ID";
      
       public static <T> T getObject(Message message, EntityManager em, Class<T> objClass) throws JMSException {
       Long pk = message.getLongProperty(ID_KEY);
       if (pk==null) {
       StringBuffer str = new StringBuffer();
       str.append("PK is null!\n");
       Enumeration props = message.getPropertyNames();
       while (props.hasMoreElements()) {
       String prop = (String)props.nextElement();
       str.append(prop).append("=").append(message.getObjectProperty(prop).toString()).append('\n');
       }
       log.warn(str.toString());
       }
       T obj = em.find(objClass, pk);
       if (obj==null) {
       log.warn("Object is null!, pk = " + pk);
       }
       return obj;
       }
      
       public static void sendMessage(Long objectId, String queue) {
       List<Long> list = new ArrayList<Long>();
       list.add(objectId);
       sendMessages(list, new String[]{queue});
       }
      
       public static void sendMessages(List<Long> objectIds, String[] queues) {
       Connection connection = null;
       Session session = null;
       try {
       connection = getConnectionFactory().createConnection();
       connection.start();
      
       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      
       List<Message> messages = new ArrayList<Message>();
      
       for (Long id : objectIds) {
       Message message = session.createMessage();
       message.setLongProperty(ID_KEY, id);
       messages.add(message);
       }
      
       sendJMSMessages(queues, session, messages);
       } catch (JMSException ex) {
       log.error(ex);
       throw new EJBException("error sending message", ex);
       } catch (NamingException e) {
       log.error(e);
       throw new EJBException("error sending message", e);
       } finally {
       closeSession(session);
       closeConnection(connection);
       }
       }
      
       private static void sendListObjectMessage(List objects, String[] queues) {
       Connection connection = null;
       Session session = null;
       try {
       connection = getConnectionFactory().createConnection();
       connection.start();
      
       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      
       List<Message> messages = new ArrayList<Message>();
      
       for (Object object : objects) {
       ObjectMessage message = session.createObjectMessage();
       message.setObject((Serializable) object);
       messages.add(message);
       }
      
       sendJMSMessages(queues, session, messages);
       } catch (JMSException ex) {
       log.error(ex);
       throw new EJBException("error sending message", ex);
       } catch (NamingException e) {
       log.error(e);
       throw new EJBException("error sending message", e);
       } finally {
       closeSession(session);
       closeConnection(connection);
       }
       }
      
       private static void sendJMSMessages(String[] queues, Session session, List<Message> messages) throws JMSException, NamingException {
       MessageProducer producer = session.createProducer(null);
       try {
       for (String q : queues) {
       Destination dest = getQueue(q);
       for (Message message : messages) {
       producer.send(dest, message);
       }
       }
       } finally {
       if (producer!=null) {
       producer.close();
       }
       }
       }
      
       private static void sendObjectMessage(Serializable object, String[] queues) {
       ArrayList<Serializable> list = new ArrayList<Serializable>();
       list.add(object);
       sendListObjectMessage(list, queues);
       }
      
       public static void sendObjectMessage(Serializable object, String queue) {
       sendObjectMessage(object, new String[]{queue});
       }
      
       private static Queue getQueue(String name) throws NamingException {
       return (Queue) getContext().lookup(name);
       }
      
       private static Context _ctx;
      
       /**
       * http://wiki.jboss.org/wiki/Wiki.jsp?page=JBossHAJNDIUseCluster
       */
       private static Context getContext() throws NamingException {
       if (null == _ctx) {
       /*
       String partitionName = System.getProperty("jboss.partition.name");
       if (null != partitionName && partitionName.length() > 0) {
       Properties p = new Properties();
       p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
       p.put(Context.URL_PKG_PREFIXES, "jboss.naming:org.jnp.interfaces");
       p.put("jnp.partitionName", partitionName);
       _ctx = new InitialContext(p);
       } else {
       log.warn("can't find cluster partition name");
       _ctx = new InitialContext();
       }
       */
       _ctx = new InitialContext();
       }
       return _ctx;
       }
      
      
      /**
       * Closes the JMS connection.
       */
       private static void closeConnection(Connection connection) {
       try {
       if (connection != null)
       connection.close();
       } catch (JMSException e) {
       log.warn("Could not close JMS connection", e);
       }
       }
      
       /**
       * Closes the JMS session.
       */
       private static void closeSession(Session session) {
       try {
       if (session != null)
       session.close();
       } catch (JMSException e) {
       log.warn("Could not close JMS session", e);
       }
       }
      
       private static ConnectionFactory getConnectionFactory() throws NamingException {
       return (ConnectionFactory) getContext().lookup(Constants.JMS_CONNECTION_FACTORY_NAME);
       }
      
       public static void sendMessage(Long id, QueueConnectionFactory factory, String queue) {
       sendMessage(id, queue);
       }
      
       public static void sendMessages(List<Long> list, QueueConnectionFactory factory, String[] strings) {
       sendMessages(list, strings);
       }
      
       public static void sendObjectMessage(Serializable obj, QueueConnectionFactory factory, String queue) {
       sendObjectMessage(obj, queue);
       }
      
      
      


      debug log is very big, but i've putted it here: http://tvpayrev.belhard.com/feeds/t.txt

      you can see that MDB1 finished (first line), and last line informs us that MDB2 can't load entity.




        • 2. Re: XA transaction
          Tim Fox Master

          Yes, that would be expected behaviour.

          You have a transaction in which you are doing two things:

          1) persisting something to your database

          2) sending a jms message

          When the transaction commits, the two XAResources corresponding to the database and the jms system will first have prepare called on them by the transaction manager, then they will have commit called on them.

          The *order* in which each XAResource is called is up to the transaction manager and not specified in the JTA spec.

          So, if the commit gets called on the database XAResource *before* commit is called on the JMS XAresource there is a chance the message is sent and received before the entity is persisted in the database.

          You should design your system not to rely on implementation details of the transaction manager which may vary from transaction manager to transaction manager and from time to time.