3 Replies Latest reply on Jan 19, 2006 7:01 AM by Steffen Vinther

    JMS in transacted beans

    Steffen Vinther Newbie

      Hi

      I have a annoying problem with JMS and transactions. From a Session bean with trans-attribute=Required I construct a message and send it to a Queue. I want the QueueSession to synchronize its commit with the SessionBean's commit, but the QueueSession is never being committed.

      I Create the QueueSession like this in the SessionBean's ejbCreate method

       public void ejbCreate() throws CreateException {
       try {
       jndiContext = new InitialContext();
       QueueConnectionFactory queueConnectionFactory = (QueueConnectionFactory) jndiContext.lookup("XAConnectionFactory");
       log.debug("QCF: " + queueConnectionFactory);
       Queue queue = (Queue) jndiContext.lookup(ObserverProcessor.QUEUENAME);
       queueConnection = (QueueConnection) queueConnectionFactory.createQueueConnection();
       queueSession = (QueueSession) queueConnection.createQueueSession(true, 0);
      
       queueSender = queueSession.createSender(queue);
      ...
      


      Im on JBoss 3.2.7 + PostgreSQL 8.1.1 and I have configured a working datasource like this

      <datasources>
       <xa-datasource>
       <!--<jndi-name>PostgresXADS</jndi-name>-->
       <jndi-name>DefaultDS</jndi-name>
       <track-connection-by-tx/>
       <xa-datasource-class>org.postgresql.xa.PGXADataSource</xa-datasource-class>
       <xa-datasource-property name="ServerName">192.168.1.177</xa-datasource-property>
       <xa-datasource-property name="PortNumber">5432</xa-datasource-property>
       <xa-datasource-property name="DatabaseName">signserver-svs</xa-datasource-property>
       <xa-datasource-property name="User">postgres</xa-datasource-property>
       <xa-datasource-property name="Password"></xa-datasource-property>
       </xa-datasource>
      </datasources>
      


      In the jms subdir of the deploy directory I have this server element
      <server>
      
       <!-- ==================================================================== -->
       <!-- Persistence and caching using Postgres -->
       <!-- IMPORTANT: Remove hsqldb-jdbc2-service.xml -->
       <!-- ==================================================================== -->
      
       <!--
       | The destination manager is the core service within JBossMQ
       -->
       <mbean code="org.jboss.mq.server.jmx.DestinationManager" name="jboss.mq:service=DestinationManager">
       <depends optional-attribute-name="MessageCache">jboss.mq:service=MessageCache</depends>
       <depends optional-attribute-name="PersistenceManager">jboss.mq:service=PersistenceManager</depends>
       <depends optional-attribute-name="StateManager">jboss.mq:service=StateManager</depends>
       </mbean>
      
       <!--
       | The MessageCache decides where to put JBossMQ message that
       | are sitting around waiting to be consumed by a client.
       |
       | The memory marks are in Megabytes. Once the JVM memory usage hits
       | the high memory mark, the old messages in the cache will start getting
       | stored in the DataDirectory. As memory usage gets closer to the
       | Max memory mark, the amount of message kept in the memory cache aproaches 0.
       -->
       <mbean code="org.jboss.mq.server.MessageCache"
       name="jboss.mq:service=MessageCache">
       <attribute name="HighMemoryMark">50</attribute>
       <attribute name="MaxMemoryMark">60</attribute>
       <attribute name="CacheStore">jboss.mq:service=PersistenceManager</attribute>
       </mbean>
      
       <!-- The PersistenceManager is used to store messages to disk. -->
       <!--
       | The jdbc2 PersistenceManager is the new improved JDBC implementation.
       | This implementation allows you to control how messages are stored in
       | the database.
       |
       | This jdbc2 PM configuration has was supplied by Stephane Nicoll in the forums as an example for Postgres
       -->
       <mbean code="org.jboss.mq.pm.jdbc2.PersistenceManager"
       name="jboss.mq:service=PersistenceManager">
       <depends optional-attribute-name="ConnectionManager">jboss.jca:service=XATxCM,name=DefaultDS</depends>
       <attribute name="SqlProperties">
       BLOB_TYPE=BYTES_BLOB
       INSERT_TX = INSERT INTO JMS_TRANSACTIONS (TXID) values(?)
       INSERT_MESSAGE = INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?)
       SELECT_ALL_UNCOMMITED_TXS = SELECT TXID FROM JMS_TRANSACTIONS
       SELECT_MAX_TX = SELECT MAX(TXID) FROM JMS_MESSAGES
       SELECT_MESSAGES_IN_DEST = SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=?
       SELECT_MESSAGE = SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?
       MARK_MESSAGE = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?
       UPDATE_MESSAGE = UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?
       UPDATE_MARKED_MESSAGES = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=?
       UPDATE_MARKED_MESSAGES_WITH_TX = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?
       UPDATE_MESSAGE = UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?
       UPDATE_MARKED_MESSAGES = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=?
       UPDATE_MARKED_MESSAGES_WITH_TX = UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?
       DELETE_MARKED_MESSAGES_WITH_TX = DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS) AND TXOP=?
       DELETE_TX = DELETE FROM JMS_TRANSACTIONS WHERE TXID = ?
       DELETE_MARKED_MESSAGES = DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=?
       DELETE_TEMPORARY_MESSAGES = DELETE FROM JMS_MESSAGES WHERE TXOP='T'
       DELETE_MESSAGE = DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?
       CREATE_MESSAGE_TABLE = CREATE TABLE JMS_MESSAGES (MESSAGEID INTEGER NOT NULL, DESTINATION VARCHAR(150) NOT NULL, TXID INTEGER, TXOP CHAR(1), MESSAGEBLOB BYTEA, PRIMARY KEY (MESSAGEID, DESTINATION))
       CREATE_IDX_MESSAGE_TXOP_TXID = CREATE INDEX JMS_MESSAGES_TXOP_TXID ON JMS_MESSAGES (TXOP, TXID)
       CREATE_IDX_MESSAGE_DESTINATION = CREATE INDEX JMS_MESSAGES_DESTINATION ON JMS_MESSAGES (DESTINATION)
       CREATE_TX_TABLE = CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, PRIMARY KEY (TXID) )
       CREATE_TABLES_ON_STARTUP = TRUE
       </attribute>
       </mbean>
      
       <mbean code="org.jboss.mq.sm.jdbc.JDBCStateManager"
       name="jboss.mq:service=StateManager">
       <depends optional-attribute-name="ConnectionManager">jboss.jca:service=XATxCM,name=DefaultDS</depends>
       <attribute name="SqlProperties">
       CREATE_TABLES_ON_STARTUP = TRUE
       CREATE_USER_TABLE = CREATE TABLE JMS_USERS (USERID VARCHAR(32) NOT NULL, PASSWD VARCHAR(32) NOT NULL, \
       CLIENTID VARCHAR(128), PRIMARY KEY(USERID))
       CREATE_ROLE_TABLE = CREATE TABLE JMS_ROLES (ROLEID VARCHAR(32) NOT NULL, USERID VARCHAR(32) NOT NULL, \
       PRIMARY KEY(USERID, ROLEID))
       CREATE_SUBSCRIPTION_TABLE = CREATE TABLE JMS_SUBSCRIPTIONS (CLIENTID VARCHAR(128) NOT NULL, \
       SUBNAME VARCHAR(128) NOT NULL, TOPIC VARCHAR(255) NOT NULL, \
       SELECTOR VARCHAR(255), PRIMARY KEY(CLIENTID, SUBNAME))
       GET_SUBSCRIPTION = SELECT TOPIC, SELECTOR FROM JMS_SUBSCRIPTIONS WHERE CLIENTID=? AND SUBNAME=?
       LOCK_SUBSCRIPTION = SELECT TOPIC, SELECTOR FROM JMS_SUBSCRIPTIONS WHERE CLIENTID=? AND SUBNAME=?
       GET_SUBSCRIPTIONS_FOR_TOPIC = SELECT CLIENTID, SUBNAME, SELECTOR FROM JMS_SUBSCRIPTIONS WHERE TOPIC=?
       INSERT_SUBSCRIPTION = INSERT INTO JMS_SUBSCRIPTIONS (CLIENTID, SUBNAME, TOPIC, SELECTOR) VALUES(?,?,?,?)
       UPDATE_SUBSCRIPTION = UPDATE JMS_SUBSCRIPTIONS SET TOPIC=?, SELECTOR=? WHERE CLIENTID=? AND SUBNAME=?
       REMOVE_SUBSCRIPTION = DELETE FROM JMS_SUBSCRIPTIONS WHERE CLIENTID=? AND SUBNAME=?
       GET_USER_BY_CLIENTID = SELECT USERID, PASSWD, CLIENTID FROM JMS_USERS WHERE CLIENTID=?
       GET_USER = SELECT PASSWD, CLIENTID FROM JMS_USERS WHERE USERID=?
       POPULATE.TABLES.01 = INSERT INTO JMS_USERS (USERID, PASSWD) VALUES ('guest', 'guest')
       POPULATE.TABLES.02 = INSERT INTO JMS_USERS (USERID, PASSWD) VALUES ('j2ee', 'j2ee')
       POPULATE.TABLES.03 = INSERT INTO JMS_USERS (USERID, PASSWD, CLIENTID) VALUES ('john', 'needle', 'DurableSubscriberExample')
       POPULATE.TABLES.04 = INSERT INTO JMS_USERS (USERID, PASSWD) VALUES ('nobody', 'nobody')
       POPULATE.TABLES.05 = INSERT INTO JMS_USERS (USERID, PASSWD) VALUES ('dynsub', 'dynsub')
       POPULATE.TABLES.06 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('guest','guest')
       POPULATE.TABLES.07 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('j2ee','guest')
       POPULATE.TABLES.08 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('john','guest')
       POPULATE.TABLES.09 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('subscriber','john')
       POPULATE.TABLES.10 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('publisher','john')
       POPULATE.TABLES.11 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('publisher','dynsub')
       POPULATE.TABLES.12 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('durpublisher','john')
       POPULATE.TABLES.13 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('durpublisher','dynsub')
       POPULATE.TABLES.14 = INSERT INTO JMS_ROLES (ROLEID, USERID) VALUES ('noacc','nobody')
       </attribute>
       </mbean>
      </server>
      

      Note that I changed jboss.jca:service=TxLocalCM,name=DefaultDS to jboss.jca:service=XATxCM,name=DefaultDS

      When the server starts I get this in the log
      2006-01-18 11:59:22,802 (RARMetaData)[] INFO Loading JBoss Resource Adapter for JDBC 2 XA drivers
      2006-01-18 11:59:22,803 (RARMetaData)[] INFO Required license terms present. See deployment descriptor.
      2006-01-18 11:59:24,028 (JmsXA)[jboss.jca:service=TxCM,name=JmsXA TxConnectionManager] INFO Bound connection factory for resource adapter for ConnectionManager 'jboss.jca:service=TxCM,name=J
      msXA to JNDI name 'java:/JmsXA'
      2006-01-18 11:59:24,034 (DefaultDS)[jboss.jca:service=XATxCM,name=DefaultDS TxConnectionManager] INFO Bound connection factory for resource adapter for ConnectionManager 'jboss.jca:service=X
      ATxCM,name=DefaultDS to JNDI name 'java:/DefaultDS'
      



      Something that I don't understand is the file in jms-ds.xml from the default application in the jboss3.2.7 distribution. It contains an element like this
       <!-- JMS XA Resource adapter, use this to get transacted JMS in beans -->
       <tx-connection-factory>
       <jndi-name>JmsXA</jndi-name>
       <xa-transaction/>
       <adapter-display-name>JMS Adapter</adapter-display-name>
       <config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property>
       <config-property name="JmsProviderAdapterJNDI" type="java.lang.String">java:/DefaultJMSProvider</config-property>
       <max-pool-size>20</max-pool-size>
       <security-domain-and-application>JmsXARealm</security-domain-and-application>
       </tx-connection-factory>
      


      And as seen in the log, jboss seems to somehow deploy this. Do I need to enable this XA Resource adaptor somehow, or does someone have some other clues or hints for me.



        • 2. Re: JMS in transacted beans
          Steffen Vinther Newbie

          That seemed to solve my problem about committing automatically.

          But I still have a problem with the scenario. In a MDB onMessage method I change/create some entity beans and send a message. Both changes are committed successfully on exit of the onMessage method. The message sent then reaches another MDB, but here the entity bean changes made in the fist MDB can not be seen.

          It actually works 1 time, on the first run, so I think it might be a problem with setting up the QueueSession in the ejbCreate method. If that is so, how or where should the QueueSession be set up ? If setting it up in the onMessage method, I don't see how the Session would ever be closed down again

          • 3. Re: JMS in transacted beans
            Steffen Vinther Newbie

            I have now read this wiki entry http://wiki.jboss.org/wiki/Wiki.jsp?page=MyJMSReceiverCannotSeeADBUpdateMadeInTheSameTransactionAsASend

            and understand that 2 phase commits does not guarantee that when the transaction is done, all datasources has completed their commit operations. The wiki entry states that setting the isolation level of the datasource to TRANSACTION_SERIALIZABLE should prevent the problem. So I tried that, but with no luck. Here is my datasource configuration

            <datasources>
             <xa-datasource>
             <!--<jndi-name>PostgresXADS</jndi-name>-->
             <jndi-name>DefaultDS</jndi-name>
             <track-connection-by-tx/>
             <xa-datasource-class>org.postgresql.xa.PGXADataSource</xa-datasource-class>
             <xa-datasource-property name="ServerName">192.168.1.177</xa-datasource-property>
             <xa-datasource-property name="PortNumber">5432</xa-datasource-property>
             <xa-datasource-property name="DatabaseName">signserver-svs</xa-datasource-property>
             <xa-datasource-property name="User">postgres</xa-datasource-property>
             <xa-datasource-property name="Password"></xa-datasource-property>
             <transaction-isolation>TRANSACTION_SERIALIZABLE</transaction-isolation>
             </xa-datasource>
            
            </datasources>
            


            Any additional hints would be highly appreciated!