XA transaction
relgames Jul 6, 2007 1:28 PMHi!
We have confronted with strange issue.
We have :
1) MDB1 that send JMS messages
2) MDB2 that receives message
Scenario is:
1. MDB1 persists entity and sends JMS message with ID of entity persisted
2. MDB2 received message and calls EntityManager.find() with id received.
Some times step 2 fails: entity manager doesn't contains entity!
It seems that MDB receives message earlier than entity manager fully persists entity.
We use JmsXA as Connection Factory.
Here is the code:
// MDB 1
public void onMessage(Message message) {
try {
... some code...
Domain d = new Domain("some name");
manager.persist(d);
list.add(d);
.... some code
MessageHelper.sendMessages(list, new String[]{
Constants.JmsQueueNames.DOMAIN_OWNER_APPROVER,
Constants.JmsQueueNames.TRADEMARK_APPROVER,
Constants.JmsQueueNames.ADULT_APPROVER});
} catch (JMSException e) {
log.error(e);
throw new EJBException(e);
}
log.debug("finished");
}
/// MDB 2
public void onMessage(Message message) {
.. some code ...
Domain domain = MessageHelper.getObject(message, manager, Domain.class);
.. some code ...
}
public class MessageHelper {
private static Log log = LogFactory.getLog(MessageHelper.class);
private static final String ID_KEY = "ENTITY_ID";
public static <T> T getObject(Message message, EntityManager em, Class<T> objClass) throws JMSException {
Long pk = message.getLongProperty(ID_KEY);
if (pk==null) {
StringBuffer str = new StringBuffer();
str.append("PK is null!\n");
Enumeration props = message.getPropertyNames();
while (props.hasMoreElements()) {
String prop = (String)props.nextElement();
str.append(prop).append("=").append(message.getObjectProperty(prop).toString()).append('\n');
}
log.warn(str.toString());
}
T obj = em.find(objClass, pk);
if (obj==null) {
log.warn("Object is null!, pk = " + pk);
}
return obj;
}
public static void sendMessage(Long objectId, String queue) {
List<Long> list = new ArrayList<Long>();
list.add(objectId);
sendMessages(list, new String[]{queue});
}
public static void sendMessages(List<Long> objectIds, String[] queues) {
Connection connection = null;
Session session = null;
try {
connection = getConnectionFactory().createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
List<Message> messages = new ArrayList<Message>();
for (Long id : objectIds) {
Message message = session.createMessage();
message.setLongProperty(ID_KEY, id);
messages.add(message);
}
sendJMSMessages(queues, session, messages);
} catch (JMSException ex) {
log.error(ex);
throw new EJBException("error sending message", ex);
} catch (NamingException e) {
log.error(e);
throw new EJBException("error sending message", e);
} finally {
closeSession(session);
closeConnection(connection);
}
}
private static void sendListObjectMessage(List objects, String[] queues) {
Connection connection = null;
Session session = null;
try {
connection = getConnectionFactory().createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
List<Message> messages = new ArrayList<Message>();
for (Object object : objects) {
ObjectMessage message = session.createObjectMessage();
message.setObject((Serializable) object);
messages.add(message);
}
sendJMSMessages(queues, session, messages);
} catch (JMSException ex) {
log.error(ex);
throw new EJBException("error sending message", ex);
} catch (NamingException e) {
log.error(e);
throw new EJBException("error sending message", e);
} finally {
closeSession(session);
closeConnection(connection);
}
}
private static void sendJMSMessages(String[] queues, Session session, List<Message> messages) throws JMSException, NamingException {
MessageProducer producer = session.createProducer(null);
try {
for (String q : queues) {
Destination dest = getQueue(q);
for (Message message : messages) {
producer.send(dest, message);
}
}
} finally {
if (producer!=null) {
producer.close();
}
}
}
private static void sendObjectMessage(Serializable object, String[] queues) {
ArrayList<Serializable> list = new ArrayList<Serializable>();
list.add(object);
sendListObjectMessage(list, queues);
}
public static void sendObjectMessage(Serializable object, String queue) {
sendObjectMessage(object, new String[]{queue});
}
private static Queue getQueue(String name) throws NamingException {
return (Queue) getContext().lookup(name);
}
private static Context _ctx;
/**
* http://wiki.jboss.org/wiki/Wiki.jsp?page=JBossHAJNDIUseCluster
*/
private static Context getContext() throws NamingException {
if (null == _ctx) {
/*
String partitionName = System.getProperty("jboss.partition.name");
if (null != partitionName && partitionName.length() > 0) {
Properties p = new Properties();
p.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
p.put(Context.URL_PKG_PREFIXES, "jboss.naming:org.jnp.interfaces");
p.put("jnp.partitionName", partitionName);
_ctx = new InitialContext(p);
} else {
log.warn("can't find cluster partition name");
_ctx = new InitialContext();
}
*/
_ctx = new InitialContext();
}
return _ctx;
}
/**
* Closes the JMS connection.
*/
private static void closeConnection(Connection connection) {
try {
if (connection != null)
connection.close();
} catch (JMSException e) {
log.warn("Could not close JMS connection", e);
}
}
/**
* Closes the JMS session.
*/
private static void closeSession(Session session) {
try {
if (session != null)
session.close();
} catch (JMSException e) {
log.warn("Could not close JMS session", e);
}
}
private static ConnectionFactory getConnectionFactory() throws NamingException {
return (ConnectionFactory) getContext().lookup(Constants.JMS_CONNECTION_FACTORY_NAME);
}
public static void sendMessage(Long id, QueueConnectionFactory factory, String queue) {
sendMessage(id, queue);
}
public static void sendMessages(List<Long> list, QueueConnectionFactory factory, String[] strings) {
sendMessages(list, strings);
}
public static void sendObjectMessage(Serializable obj, QueueConnectionFactory factory, String queue) {
sendObjectMessage(obj, queue);
}
debug log is very big, but i've putted it here: http://tvpayrev.belhard.com/feeds/t.txt
you can see that MDB1 finished (first line), and last line informs us that MDB2 can't load entity.