JMS in transacted beans
svinther Jan 18, 2006 7:25 AMHi
I have a annoying problem with JMS and transactions. From a Session bean with trans-attribute=Required I construct a message and send it to a Queue. I want the QueueSession to synchronize its commit with the SessionBean's commit, but the QueueSession is never being committed.
I Create the QueueSession like this in the SessionBean's ejbCreate method
public void ejbCreate() throws CreateException { try { jndiContext = new InitialContext(); QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) jndiContext.lookup("XAConnectionFactory"); log.debug("QCF: " + queueConnectionFactory); Queue queue = (Queue) jndiContext.lookup(ObserverProcessor.QUEUENAME); queueConnection = (QueueConnection) queueConnectionFactory.createQueueConnection(); queueSession = (QueueSession) queueConnection.createQueueSession(true, 0); queueSender = queueSession.createSender(queue); ...
Im on JBoss 3.2.7 + PostgreSQL 8.1.1 and I have configured a working datasource like this
<datasources> <xa-datasource> <!--<jndi-name>PostgresXADS</jndi-name>--> <jndi-name>DefaultDS</jndi-name> <track-connection-by-tx/> <xa-datasource-class>org.postgresql.xa.PGXADataSource</xa-datasource-class> <xa-datasource-property name="ServerName">192.168.1.177</xa-datasource-property> <xa-datasource-property name="PortNumber">5432</xa-datasource-property> <xa-datasource-property name="DatabaseName">signserver-svs</xa-datasource-property> <xa-datasource-property name="User">postgres</xa-datasource-property> <xa-datasource-property name="Password"></xa-datasource-property> </xa-datasource> </datasources>
In the jms subdir of the deploy directory I have this server element
<server> <!-- ==================================================================== --> <!-- Persistence and caching using Postgres --> <!-- IMPORTANT: Remove hsqldb-jdbc2-service.xml --> <!-- ==================================================================== --> <!-- | The destination manager is the core service within JBossMQ --> <mbean code="org.jboss.mq.server.jmx.DestinationManager" name="jboss.mq:service=DestinationManager"> <depends optional-attribute-name="MessageCache">jboss.mq:service=MessageCache</depends> <depends optional-attribute-name="PersistenceManager">jboss.mq:service=PersistenceManager</depends> <depends optional-attribute-name="StateManager">jboss.mq:service=StateManager</depends> </mbean> <!-- | The MessageCache decides where to put JBossMQ message that | are sitting around waiting to be consumed by a client. | | The memory marks are in Megabytes. Once the JVM memory usage hits | the high memory mark, the old messages in the cache will start getting | stored in the DataDirectory. As memory usage gets closer to the | Max memory mark, the amount of message kept in the memory cache aproaches 0. --> <mbean code="org.jboss.mq.server.MessageCache" name="jboss.mq:service=MessageCache"> <attribute name="HighMemoryMark">50</attribute> <attribute name="MaxMemoryMark">60</attribute> <attribute name="CacheStore">jboss.mq:service=PersistenceManager</attribute> </mbean> <!-- The PersistenceManager is used to store messages to disk. --> <!-- | The jdbc2 PersistenceManager is the new improved JDBC implementation. | This implementation allows you to control how messages are stored in | the database. | | This jdbc2 PM configuration has was supplied by Stephane Nicoll in the forums as an example for Postgres --> <mbean code="org.jboss.mq.pm.jdbc2.PersistenceManager" name="jboss.mq:service=PersistenceManager"> <depends optional-attribute-name="ConnectionManager">jboss.jca:service=XATxCM,name=DefaultDS</depends> <attribute name="SqlProperties"> BLOB_TYPE=BYTES_BLOB INSERT_TX = INSERT INTO JMS_TRANSACTIONS (TXID) values(?) INSERT_MESSAGE = INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?) SELECT_ALL_UNCOMMITED_TXS = SELECT TXID FROM JMS_TRANSACTIONS SELECT_MAX_TX = SELECT MAX(TXID) FROM JMS_MESSAGES SELECT_MESSAGES_IN_DEST = SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=? SELECT_MESSAGE = SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=? MARK_MESSAGE = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=? UPDATE_MESSAGE = UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=? UPDATE_MARKED_MESSAGES = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? UPDATE_MARKED_MESSAGES_WITH_TX = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=? UPDATE_MESSAGE = UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=? UPDATE_MARKED_MESSAGES = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? UPDATE_MARKED_MESSAGES_WITH_TX = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=? DELETE_MARKED_MESSAGES_WITH_TX = DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS) AND TXOP=? DELETE_TX = DELETE FROM JMS_TRANSACTIONS WHERE TXID = ? DELETE_MARKED_MESSAGES = DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=? DELETE_TEMPORARY_MESSAGES = DELETE FROM JMS_MESSAGES WHERE TXOP='T' DELETE_MESSAGE = DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=? CREATE_MESSAGE_TABLE = CREATE TABLE JMS_MESSAGES (MESSAGEID INTEGER NOT NULL, DESTINATION VARCHAR(150) NOT NULL, TXID INTEGER, TXOP CHAR(1), MESSAGEBLOB BYTEA, PRIMARY KEY (MESSAGEID, DESTINATION)) CREATE_IDX_MESSAGE_TXOP_TXID = CREATE INDEX JMS_MESSAGES_TXOP_TXID ON JMS_MESSAGES (TXOP, TXID) CREATE_IDX_MESSAGE_DESTINATION = CREATE INDEX JMS_MESSAGES_DESTINATION ON JMS_MESSAGES (DESTINATION) CREATE_TX_TABLE = CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, PRIMARY KEY (TXID) ) CREATE_TABLES_ON_STARTUP = TRUE </attribute> </mbean> <mbean code="org.jboss.mq.sm.jdbc.JDBCStateManager" name="jboss.mq:service=StateManager"> <depends optional-attribute-name="ConnectionManager">jboss.jca:service=XATxCM,name=DefaultDS</depends> <attribute name="SqlProperties"> CREATE_TABLES_ON_STARTUP = TRUE CREATE_USER_TABLE = CREATE TABLE JMS_USERS (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, \ CLIENTID VARCHAR(128), PRIMARY KEY(USERID)) CREATE_ROLE_TABLE = CREATE TABLE JMS_ROLES (ROLEID VARCHAR(32) NOT NULL, USERID VARCHAR(32) NOT NULL, \ PRIMARY KEY(USERID, ROLEID)) CREATE_SUBSCRIPTION_TABLE = CREATE TABLE JMS_SUBSCRIPTIONS (CLIENTID VARCHAR(128) NOT NULL, \ SUBNAME VARCHAR(128) NOT NULL, TOPIC VARCHAR(255) NOT NULL, \ SELECTOR VARCHAR(255), PRIMARY KEY(CLIENTID, SUBNAME)) GET_SUBSCRIPTION = SELECT TOPIC, SELECTOR FROM JMS_SUBSCRIPTIONS WHERE CLIENTID=? AND SUBNAME=? LOCK_SUBSCRIPTION = SELECT TOPIC, SELECTOR FROM JMS_SUBSCRIPTIONS WHERE CLIENTID=? AND SUBNAME=? GET_SUBSCRIPTIONS_FOR_TOPIC = SELECT CLIENTID, SUBNAME, SELECTOR FROM JMS_SUBSCRIPTIONS WHERE TOPIC=? INSERT_SUBSCRIPTION = INSERT INTO JMS_SUBSCRIPTIONS (CLIENTID, SUBNAME, TOPIC, SELECTOR) VALUES(?,?,?,?) UPDATE_SUBSCRIPTION = UPDATE JMS_SUBSCRIPTIONS SET TOPIC=?, SELECTOR=? WHERE CLIENTID=? AND SUBNAME=? REMOVE_SUBSCRIPTION = DELETE FROM JMS_SUBSCRIPTIONS WHERE CLIENTID=? AND SUBNAME=? GET_USER_BY_CLIENTID = SELECT USERID, PASSWD, CLIENTID FROM JMS_USERS WHERE CLIENTID=? GET_USER = SELECT PASSWD, CLIENTID FROM JMS_USERS WHERE USERID=? POPULATE.TABLES.01 = INSERT INTO JMS_USERS (USERID, PASSWD) VALUES ('guest', 'guest') POPULATE.TABLES.02 = INSERT INTO JMS_USERS (USERID, PASSWD) VALUES ('j2ee', 'j2ee') POPULATE.TABLES.03 = INSERT INTO JMS_USERS (USERID, PASSWD, CLIENTID) VALUES ('john', 'needle', 'DurableSubscriberExample') POPULATE.TABLES.04 = INSERT INTO JMS_USERS (USERID, PASSWD) VALUES ('nobody', 'nobody') POPULATE.TABLES.05 = INSERT INTO JMS_USERS (USERID, PASSWD) VALUES ('dynsub', 'dynsub') POPULATE.TABLES.06 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('guest','guest') POPULATE.TABLES.07 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('j2ee','guest') POPULATE.TABLES.08 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('john','guest') POPULATE.TABLES.09 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('subscriber','john') POPULATE.TABLES.10 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('publisher','john') POPULATE.TABLES.11 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('publisher','dynsub') POPULATE.TABLES.12 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('durpublisher','john') POPULATE.TABLES.13 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('durpublisher','dynsub') POPULATE.TABLES.14 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('noacc','nobody') </attribute> </mbean> </server>
Note that I changed jboss.jca:service=TxLocalCM,name=DefaultDS to jboss.jca:service=XATxCM,name=DefaultDS
When the server starts I get this in the log
2006-01-18 11:59:22,802 (RARMetaData)[] INFO Loading JBoss Resource Adapter for JDBC 2 XA drivers 2006-01-18 11:59:22,803 (RARMetaData)[] INFO Required license terms present. See deployment descriptor. 2006-01-18 11:59:24,028 (JmsXA)[jboss.jca:service=TxCM,name=JmsXA TxConnectionManager] INFO Bound connection factory for resource adapter for ConnectionManager 'jboss.jca:service=TxCM,name=J msXA to JNDI name 'java:/JmsXA' 2006-01-18 11:59:24,034 (DefaultDS)[jboss.jca:service=XATxCM,name=DefaultDS TxConnectionManager] INFO Bound connection factory for resource adapter for ConnectionManager 'jboss.jca:service=X ATxCM,name=DefaultDS to JNDI name 'java:/DefaultDS'
Something that I don't understand is the file in jms-ds.xml from the default application in the jboss3.2.7 distribution. It contains an element like this
<!-- JMS XA Resource adapter, use this to get transacted JMS in beans --> <tx-connection-factory> <jndi-name>JmsXA</jndi-name> <xa-transaction/> <adapter-display-name>JMS Adapter</adapter-display-name> <config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property> <config-property name="JmsProviderAdapterJNDI" type="java.lang.String">java:/DefaultJMSProvider</config-property> <max-pool-size>20</max-pool-size> <security-domain-and-application>JmsXARealm</security-domain-and-application> </tx-connection-factory>
And as seen in the log, jboss seems to somehow deploy this. Do I need to enable this XA Resource adaptor somehow, or does someone have some other clues or hints for me.