/* * JBoss, Home of Professional Open Source * Copyright 2005, JBoss Inc., and individual contributors as indicated * by the @authors tag. See the copyright.txt in the distribution for a * full listing of individual contributors. * * This is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation; either version 2.1 of * the License, or (at your option) any later version. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public * License along with this software; if not, write to the Free * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA * 02110-1301 USA, or see the FSF site: http://www.fsf.org. */ package org.jboss.mq.pm.jdbc3; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; import java.util.Properties; import javax.jms.JMSException; import javax.management.ObjectName; import javax.naming.InitialContext; import javax.sql.DataSource; import javax.transaction.Status; import javax.transaction.Transaction; import javax.transaction.TransactionManager; import org.jboss.mq.DurableSubscriptionID; import org.jboss.mq.SpyDestination; import org.jboss.mq.SpyJMSException; import org.jboss.mq.SpyMessage; import org.jboss.mq.SpyTopic; import org.jboss.mq.pm.CacheStore; import org.jboss.mq.pm.NewPersistenceManager; import org.jboss.mq.pm.Tx; import org.jboss.mq.pm.TxManager; import org.jboss.mq.server.JMSDestination; import org.jboss.mq.server.JMSTopic; import org.jboss.mq.server.MessageCache; import org.jboss.mq.server.MessageReference; import org.jboss.system.ServiceMBeanSupport; import org.jboss.tm.TransactionManagerService; import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong; /** * This class manages all persistence related services for JDBC based * persistence. * * @jmx:mbean extends="org.jboss.system.ServiceMBean, org.jboss.mq.pm.PersistenceManagerMBean, org.jboss.mq.pm.CacheStoreMBean" * * @author Jayesh Parayali (jayeshpk1@yahoo.com) * @author Hiram Chirino (cojonudo14@hotmail.com) * @author Adrian Brock (adrian@jboss.org) * * @version $Revision: 1.6.4.4 $ */ public class PersistenceManager extends ServiceMBeanSupport implements PersistenceManagerMBean, NewPersistenceManager, CacheStore, Runnable { // Constants -------------------------------------------------------------------- /** Message is an object */ static final int OBJECT_BLOB = 0; /** Message is a byte array */ static final int BYTES_BLOB = 1; /** Message is a binary stream */ static final int BINARYSTREAM_BLOB = 2; /** Message is a blob */ static final int BLOB_BLOB = 3; // Attributes ------------------------------------------------------------------- /** The next transaction id */ private SynchronizedLong nextTransactionId = new SynchronizedLong(0l); /** The jbossmq transaction manager */ private TxManager txManager; /** The data source */ private DataSource datasource; /** The jta transaction manager */ private TransactionManager tm; /** The object name of the connection manager */ private ObjectName connectionManagerName; /** The sql properties */ private Properties sqlProperties = new Properties(); String UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE TXOP=?"; String UPDATE_MARKED_REFERENCES = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE TXOP=?"; String UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?"; String UPDATE_MARKED_REFERENCES_WITH_TX = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?"; String DELETE_MARKED_MESSAGES_WITH_TX = "DELETE FROM JMS_MESSAGE_LOG WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTION_LOG) AND TXOP=?"; String DELETE_MARKED_REFERENCES_WITH_TX = "DELETE FROM JMS_REFERENCE_LOG WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTION_LOG) AND TXOP=?"; String DELETE_TX = "DELETE FROM JMS_TRANSACTION_LOG WHERE TXID = ?"; String DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE TXID=? AND TXOP=?"; String DELETE_MARKED_REFERENCES = "DELETE FROM JMS_REFERENCE_LOG WHERE TXID=? AND TXOP=?"; String DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE TXOP='T'"; String DELETE_TEMPORARY_REFERENCES = "DELETE FROM JMS_REFERENCE_LOG WHERE TXOP='T'"; String INSERT_TX = "INSERT INTO JMS_TRANSACTION_LOG (TXID) values(?)"; String SELECT_MAX_TX = "SELECT MAX(TXID) FROM JMS_TRANSACTION_LOG"; String SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGE_LOG WHERE DESTINATION=?"; String SELECT_REFERENCES_IN_DEST = "SELECT R.MESSAGEID, M.MESSAGEBLOB, R.REDELIVERED, R.REDELIVERS FROM JMS_REFERENCE_LOG AS R, JMS_MESSAGE_LOG AS M" + " WHERE R.MESSAGEID = M.MESSAGEID AND R.DESTINATION=?"; String SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGE_LOG WHERE MESSAGEID=? AND DESTINATION=?"; String INSERT_MESSAGE = "INSERT INTO JMS_MESSAGE_LOG (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP, LATECLONE) VALUES(?,?,?,?,?,?)"; String INSERT_REFERENCE = "INSERT INTO JMS_REFERENCE_LOG (MESSAGEID, DESTINATION, TXID, TXOP, REDELIVERED, REDELIVERS) VALUES(?,?,?,?,?,?)"; String MARK_MESSAGE = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?"; String MARK_REFERENCE = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?"; String DELETE_MESSAGE = "DELETE FROM JMS_MESSAGE_LOG WHERE MESSAGEID=? AND DESTINATION=?"; String DELETE_REFERENCE = "DELETE FROM JMS_REFERENCE_LOG WHERE MESSAGEID=? AND DESTINATION=?"; String UPDATE_MESSAGE = "UPDATE JMS_MESSAGE_LOG SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?"; String UPDATE_REFERENCE = "UPDATE JMS_REFERENCE_LOG SET REDELIVERED=?, REDELIVERS=? WHERE MESSAGEID=? AND DESTINATION=?"; String DELETE_ORPHANED_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE LATECLONE = '1' AND MESSAGEID NOT IN (SELECT MESSAGEID FROM JMS_REFERENCE_LOG)"; String DELETE_ALL_TXS = "DELETE FROM JMS_TRANSACTION_LOG"; String CREATE_REFERENCE_TABLE = "CREATE TABLE JMS_REFERENCE_LOG ( MESSAGEID INTEGER NOT NULL, " + "DESTINATION VARCHAR(256) NOT NULL, TXID INTEGER, TXOP CHAR(1), " + "REDELIVERED CHAR(1), REDELIVERS INTEGER, " + "PRIMARY KEY (MESSAGEID, DESTINATION) )"; String CREATE_MESSAGE_TABLE = "CREATE TABLE JMS_MESSAGE_LOG ( MESSAGEID INTEGER NOT NULL, " + "DESTINATION VARCHAR(256), TXID INTEGER, TXOP CHAR(1), LATECLONE CHAR(1), " + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )"; String CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTION_LOG ( TXID INTEGER )"; /** The blob type */ int blobType = OBJECT_BLOB; /** Whether to create tables */ boolean createTables = true; /** Number of retry attempts to connect to the db */ private int connectionRetryAttempts = 5; /** The garbage collection period in millis */ private long gcPeriod = 60000; /** The background gc thread */ private Thread gcThread; // Constructors ----------------------------------------------------------------- /** * Create a new persistence manager * * @throws JMSException for any error */ public PersistenceManager() throws javax.jms.JMSException { txManager = new TxManager(this); } // Public ----------------------------------------------------------------------- /** * Retrieve the connection manager object name * * @jmx:managed-attribute */ public ObjectName getConnectionManager() { return connectionManagerName; } /** * Set the connection manager object name * * @jmx:managed-attribute */ public void setConnectionManager(ObjectName connectionManagerName) { this.connectionManagerName = connectionManagerName; } /** * Set the garbage collection period * * @jmx:managed-attribute */ public int getGCPeriodSecs() { return (int) gcPeriod / 1000; } /** * Set the garbage collection period in seconds * * @jmx:managed-attribute */ public void setGCPeriodSecs(int gcPeriodSecs) { this.gcPeriod = gcPeriodSecs * 1000; } /** * Gets the ConnectionRetryAttempts. * * @jmx:managed-attribute * @return the number of retry events */ public int getConnectionRetryAttempts() { return this.connectionRetryAttempts; } /** * Sets the ConnectionRetryAttempts. * * @jmx:managed-attribute * @param value the number of retry attempts */ public void setConnectionRetryAttempts(int value) { this.connectionRetryAttempts = value; } /** * Gets the sqlProperties. * * @jmx:managed-attribute * @return Returns the Properties */ public String getSqlProperties() { try { ByteArrayOutputStream boa = new ByteArrayOutputStream(); sqlProperties.store(boa, ""); return new String(boa.toByteArray()); } catch (IOException shouldnothappen) { return ""; } } /** * Sets the sqlProperties. * * @jmx:managed-attribute * @param sqlProperties The sqlProperties to set */ public void setSqlProperties(String value) { try { ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes()); sqlProperties = new Properties(); sqlProperties.load(is); } catch (IOException shouldnothappen) { } } // PersistenceManager implementation -------------------------------------------- public Tx createPersistentTx() throws JMSException { Tx id = new Tx(nextTransactionId.increment()); TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; boolean threadWasInterrupted = Thread.interrupted(); try { c = this.getConnection(); stmt = c.prepareStatement(INSERT_TX); stmt.setLong(1, id.longValue()); stmt.executeUpdate(); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not crate tx: " + id, e); } finally { try { stmt.close(); } catch (Throwable ignore) { } try { c.close(); } catch (Throwable ignore) { } tms.endTX(); // Restore the interrupted state of the thread if( threadWasInterrupted ) Thread.currentThread().interrupt(); } return id; } public void commitPersistentTx(Tx txId) throws JMSException { TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; boolean threadWasInterrupted = Thread.interrupted(); try { c = this.getConnection(); removeMarkedMessages(c, txId, "D"); removeMarkedReferences(c, txId, "D"); removeTXRecord(c, txId.longValue()); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not commit tx: " + txId, e); } finally { try { c.close(); } catch (Throwable ignore) { } tms.endTX(); // Restore the interrupted state of the thread if( threadWasInterrupted ) Thread.currentThread().interrupt(); } } public void rollbackPersistentTx(Tx txId) throws JMSException { TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; boolean threadWasInterrupted = Thread.interrupted(); try { c = this.getConnection(); removeMarkedMessages(c, txId, "A"); removeMarkedReferences(c, txId, "A"); removeTXRecord(c, txId.longValue()); // Restore all the messages that were logically removed. stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES_WITH_TX); stmt.setNull(1, Types.BIGINT); stmt.setString(2, "A"); stmt.setString(3, "D"); stmt.setLong(4, txId.longValue()); stmt.executeUpdate(); stmt.close(); // Restore all the references that were logically removed. stmt = c.prepareStatement(UPDATE_MARKED_REFERENCES_WITH_TX); stmt.setNull(1, Types.BIGINT); stmt.setString(2, "A"); stmt.setString(3, "D"); stmt.setLong(4, txId.longValue()); stmt.executeUpdate(); stmt.close(); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not rollback tx: " + txId, e); } finally { try { if (stmt != null) stmt.close(); if (c != null) c.close(); } catch (Throwable ignore) { } tms.endTX(); // Restore the interrupted state of the thread if( threadWasInterrupted ) Thread.currentThread().interrupt(); } } public void add(MessageReference messageRef, Tx txId) throws JMSException { boolean trace = log.isTraceEnabled(); if (trace) log.trace("About to add message " + messageRef + " transaction=" + txId); TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; boolean threadWasInterrupted = Thread.interrupted(); try { c = this.getConnection(); // Synchronize on the message to avoid a race with the softener synchronized(messageRef) { if (trace) log.trace("Inserting message " + messageRef + " transaction=" + txId); if (messageRef.isLateClone()) { addReference(c, messageRef.getPersistentKey(), messageRef, txId, "A"); } else { SpyMessage message = messageRef.getMessage(); addMessage(c, messageRef.getPersistentKey(), message, txId, "A", "0"); } messageRef.setStored(MessageReference.STORED); if (trace) log.trace("Added message " + messageRef + " transaction=" + txId); } } catch (IOException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not store message: " + messageRef, e); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not store message: " + messageRef, e); } finally { try { c.close(); } catch (Throwable ignore) { } tms.endTX(); // Restore the interrupted state of the thread if( threadWasInterrupted ) Thread.currentThread().interrupt(); } } public void update(MessageReference messageRef, Tx txId) throws JMSException { boolean trace = log.isTraceEnabled(); if (trace) log.trace("Updating message " + messageRef + " transaction=" + txId); TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; boolean threadWasInterrupted = Thread.interrupted(); try { c = this.getConnection(); if (txId == null) { if (messageRef.isLateClone()) { stmt = c.prepareStatement(UPDATE_REFERENCE); if (messageRef.redelivered) stmt.setString(1, "1"); else stmt.setString(1, "0"); stmt.setLong(2, messageRef.redeliveryCount); stmt.setLong(3, messageRef.messageId); stmt.setString(4, messageRef.getPersistentKey()); } else { stmt = c.prepareStatement(UPDATE_MESSAGE); setBlob(stmt, 1, messageRef.getMessage()); stmt.setLong(2, messageRef.messageId); stmt.setString(3, messageRef.getPersistentKey()); } int rc = stmt.executeUpdate(); if( rc != 1 ) throw new SpyJMSException("Could not update the message in the database: update affected "+rc+" rows"); } else { throw new SpyJMSException("NYI: Updating a message in a transaction is not currently used"); } if (trace) log.trace("Updated message " + messageRef + " transaction=" + txId); } catch (IOException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not update message: " + messageRef, e); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not update message: " + messageRef, e); } finally { try { stmt.close(); } catch (Throwable ignore) { } try { c.close(); } catch (Throwable ignore) { } tms.endTX(); // Restore the interrupted state of the thread if( threadWasInterrupted ) Thread.currentThread().interrupt(); } } public void remove(MessageReference messageRef, Tx txId) throws JMSException { boolean trace = log.isTraceEnabled(); if (trace) log.trace("Removing message " + messageRef + " transaction=" + txId); TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; boolean threadWasInterrupted = Thread.interrupted(); try { c = this.getConnection(); // Synchronize on the message to avoid a race with the softener synchronized(messageRef) { if (txId == null) { if (messageRef.isLateClone()) stmt = c.prepareStatement(DELETE_REFERENCE); else stmt = c.prepareStatement(DELETE_MESSAGE); stmt.setLong(1, messageRef.messageId); stmt.setString(2, messageRef.getPersistentKey()); int rc = stmt.executeUpdate(); if( rc != 1 ) throw new SpyJMSException("Could not delete the message from the database: delete affected "+rc+" rows"); // Adrian Brock: // Remove the message from the cache, but don't // return it to the pool just yet. The queue still holds // a reference to the message and will return it // to the pool once it gets enough time slice. // The alternative is to remove the validation // for double removal from the cache, // which I don't want to do because it is useful // for spotting errors messageRef.setStored(MessageReference.NOT_STORED); messageRef.removeDelayed(); } else { if (messageRef.isLateClone()) { stmt = c.prepareStatement(MARK_REFERENCE); stmt.setLong(1, txId.longValue()); stmt.setString(2, "D"); stmt.setLong(3, messageRef.messageId); stmt.setString(4, messageRef.getPersistentKey()); } else { stmt = c.prepareStatement(MARK_MESSAGE); stmt.setLong(1, txId.longValue()); stmt.setString(2, "D"); stmt.setLong(3, messageRef.messageId); stmt.setString(4, messageRef.getPersistentKey()); } int rc = stmt.executeUpdate(); if( rc != 1 ) throw new SpyJMSException("Could not mark the message as deleted in the database: update affected "+rc+" rows"); } if (trace) log.trace("Removed message " + messageRef + " transaction=" + txId); } } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not remove message: " + messageRef, e); } finally { try { stmt.close(); } catch (Throwable ignore) { } try { c.close(); } catch (Throwable ignore) { } tms.endTX(); // Restore the interrupted state of the thread if( threadWasInterrupted ) Thread.currentThread().interrupt(); } } public synchronized void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws javax.jms.JMSException { if (jmsDest == null) throw new IllegalArgumentException("Must supply non null JMSDestination to restoreQueue"); if (dest == null) throw new IllegalArgumentException("Must supply non null SpyDestination to restoreQueue"); TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; ResultSet rs = null; boolean threadWasInterrupted = Thread.interrupted(); try { c = this.getConnection(); int counter=0; if (jmsDest.parameters.lateClone) { JMSTopic topic = (JMSTopic) jmsDest; // The durable subscription is not serialized DurableSubscriptionID id = ((SpyTopic) dest).getDurableSubscriptionID(); stmt = c.prepareStatement(SELECT_REFERENCES_IN_DEST); stmt.setString(1, dest.toString()); rs = stmt.executeQuery(); while (rs.next()) { SpyMessage message = extractMessage(rs, 2); boolean redelivered = false; if (rs.getString(3).equals("1")) redelivered = true; message.header.jmsRedelivered = redelivered; message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer(rs.getInt(4))); topic.restoreMessage(message, id); counter++; } } else { stmt = c.prepareStatement(SELECT_MESSAGES_IN_DEST); stmt.setString(1, dest.toString()); rs = stmt.executeQuery(); while (rs.next()) { SpyMessage message = extractMessage(rs, 2); // The durable subscription is not serialized if (dest instanceof SpyTopic) message.header.durableSubscriberID = ((SpyTopic)dest).getDurableSubscriptionID(); jmsDest.restoreMessage(message); counter++; } } log.debug("Restored "+counter+" message(s) to: "+dest); } catch (IOException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e); } finally { try { rs.close(); } catch (Throwable ignore) { } try { stmt.close(); } catch (Throwable ignore) { } try { c.close(); } catch (Throwable ignore) { } tms.endTX(); // Restore the interrupted state of the thread if( threadWasInterrupted ) Thread.currentThread().interrupt(); } } public TxManager getTxManager() { return txManager; } public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException { // Nothing to clean up, all the state is in the db. } /** * Unsupported operation */ public MessageCache getMessageCacheInstance() { throw new UnsupportedOperationException("This is now set on the destination manager"); } // NewPersistenceManager implementation ----------------------------------------- public void addMessage(SpyMessage message) throws JMSException { TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; boolean threadWasInterrupted = Thread.interrupted(); try { c = datasource.getConnection(); addMessage(c, "*", message, null, null, "1"); } catch (IOException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not add message:", e); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not add message:", e); } finally { try { if (c != null) c.close(); } catch (Throwable ignore) { } tms.endTX(); // Restore the interrupted state of the thread if( threadWasInterrupted ) Thread.currentThread().interrupt(); } } // PersistenceManagerMBean implementation --------------------------------------- public Object getInstance() { return this; } /** * Unsupported operation */ public ObjectName getMessageCache() { throw new UnsupportedOperationException("This is now set on the destination manager"); } /** * Unsupported operation */ public void setMessageCache(ObjectName messageCache) { throw new UnsupportedOperationException("This is now set on the destination manager"); } // CacheStore implementation ---------------------------------------------------- public SpyMessage loadFromStorage(MessageReference messageRef) throws JMSException { if (log.isTraceEnabled()) log.trace("Loading message from storage " + messageRef); TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; ResultSet rs = null; boolean threadWasInterrupted = Thread.interrupted(); try { c = this.getConnection(); stmt = c.prepareStatement(SELECT_MESSAGE); stmt.setLong(1, messageRef.messageId); if (messageRef.isLateClone()) stmt.setString(2, "*"); else stmt.setString(2, messageRef.getPersistentKey()); rs = stmt.executeQuery(); if (rs.next()) return extractMessage(rs, 2); return null; } catch (IOException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not load message : " + messageRef, e); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not load message : " + messageRef, e); } finally { try { rs.close(); } catch (Throwable ignore) { } try { stmt.close(); } catch (Throwable ignore) { } try { c.close(); } catch (Throwable ignore) { } tms.endTX(); // Restore the interrupted state of the thread if( threadWasInterrupted ) Thread.currentThread().interrupt(); } } public void removeFromStorage(MessageReference messageRef) throws JMSException { // We don't remove persistent messages sent to persistent queues if (messageRef.isPersistent()) return; boolean trace = log.isTraceEnabled(); if (trace) log.trace("Removing message from storage " + messageRef); TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; boolean threadWasInterrupted = Thread.interrupted(); try { c = this.getConnection(); if (messageRef.isLateClone()) { stmt = c.prepareStatement(DELETE_REFERENCE); stmt.setLong(1, messageRef.messageId); stmt.setString(2, messageRef.getPersistentKey()); stmt.executeUpdate(); messageRef.setStored(MessageReference.NOT_STORED); } else { stmt = c.prepareStatement(DELETE_MESSAGE); stmt.setLong(1, messageRef.messageId); stmt.setString(2, messageRef.getPersistentKey()); stmt.executeUpdate(); messageRef.setStored(MessageReference.NOT_STORED); } if (trace) log.trace("Removed message from storage " + messageRef); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not remove message: " + messageRef, e); } finally { try { stmt.close(); } catch (Throwable ignore) { } try { c.close(); } catch (Throwable ignore) { } tms.endTX(); // Restore the interrupted state of the thread if( threadWasInterrupted ) Thread.currentThread().interrupt(); } } public void saveToStorage(MessageReference messageRef, SpyMessage message) throws JMSException { // Ignore save operations for persistent messages sent to persistent queues // The queues handle the persistence if (messageRef.isPersistent()) return; boolean trace = log.isTraceEnabled(); if (trace) log.trace("Saving message to storage " + messageRef); TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; boolean threadWasInterrupted = Thread.interrupted(); try { c = this.getConnection(); if (messageRef.isLateClone()) { addReference(c, messageRef.getPersistentKey(), messageRef, null, "T"); try { addMessage(c, "*", message, null, "T", "1"); } catch (SQLException e) { log.trace("TODO: Check this is really a duplicate", e); } } else { addMessage(c, messageRef.getPersistentKey(), message, null, "T", "0"); } messageRef.setStored(MessageReference.STORED); if (trace) log.trace("Saved message to storage " + messageRef); } catch (IOException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not store message: " + messageRef, e); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not store message: " + messageRef, e); } finally { try { c.close(); } catch (Throwable ignore) { } tms.endTX(); // Restore the interrupted state of the thread if( threadWasInterrupted ) Thread.currentThread().interrupt(); } } // Runnable implementation ------------------------------------------------------ public void run() { Thread current = Thread.currentThread(); while (gcThread == current) { try { Thread.sleep(gcPeriod); if (gcThread != current) return; Connection connection = datasource.getConnection(); try { PreparedStatement stmt = connection.prepareStatement(DELETE_ORPHANED_MESSAGES); try { stmt.executeUpdate(); } finally { try { stmt.close(); } catch (SQLException ignored) { log.trace("Error closing statement", ignored); } } } finally { try { connection.close(); } catch (SQLException ignored) { log.trace("Error closing connection", ignored); } } } catch (InterruptedException ignored) { } catch (Throwable t) { log.warn("Unhandled throwable in gc thread:", t); } } } // ServerMBeanSupport overrides ------------------------------------------------- protected void startService() throws Exception { UPDATE_MARKED_MESSAGES = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES", UPDATE_MARKED_MESSAGES); UPDATE_MARKED_REFERENCES = sqlProperties.getProperty("UPDATE_MARKED_REFERENCES", UPDATE_MARKED_REFERENCES); UPDATE_MARKED_MESSAGES_WITH_TX = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_WITH_TX", UPDATE_MARKED_MESSAGES_WITH_TX); UPDATE_MARKED_REFERENCES_WITH_TX = sqlProperties.getProperty("UPDATE_MARKED_REFERENCES_WITH_TX", UPDATE_MARKED_REFERENCES_WITH_TX); DELETE_MARKED_MESSAGES_WITH_TX = sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX", DELETE_MARKED_MESSAGES_WITH_TX); DELETE_MARKED_REFERENCES_WITH_TX = sqlProperties.getProperty("DELETE_MARKED_REFERENCES_WITH_TX", DELETE_MARKED_REFERENCES_WITH_TX); DELETE_TX = sqlProperties.getProperty("DELETE_TX", DELETE_TX); DELETE_MARKED_MESSAGES = sqlProperties.getProperty("DELETE_MARKED_MESSAGES", DELETE_MARKED_MESSAGES); DELETE_MARKED_REFERENCES = sqlProperties.getProperty("DELETE_MARKED_REFERENCES", DELETE_MARKED_REFERENCES); DELETE_TEMPORARY_MESSAGES = sqlProperties.getProperty("DELETE_TEMPORARY_MESSAGES", DELETE_TEMPORARY_MESSAGES); DELETE_TEMPORARY_REFERENCES = sqlProperties.getProperty("DELETE_TEMPORARY_REFERENCES", DELETE_TEMPORARY_REFERENCES); INSERT_TX = sqlProperties.getProperty("INSERT_TX", INSERT_TX); SELECT_MAX_TX = sqlProperties.getProperty("SELECT_MAX_TX", SELECT_MAX_TX); SELECT_MESSAGES_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST", SELECT_MESSAGES_IN_DEST); SELECT_REFERENCES_IN_DEST = sqlProperties.getProperty("SELECT_REFERENCES_IN_DEST", SELECT_REFERENCES_IN_DEST); SELECT_MESSAGE = sqlProperties.getProperty("SELECT_MESSAGE", SELECT_MESSAGE); INSERT_MESSAGE = sqlProperties.getProperty("INSERT_MESSAGE", INSERT_MESSAGE); INSERT_REFERENCE = sqlProperties.getProperty("INSERT_REFERENCE", INSERT_REFERENCE); MARK_MESSAGE = sqlProperties.getProperty("MARK_MESSAGE", MARK_MESSAGE); MARK_REFERENCE = sqlProperties.getProperty("MARK_REFERENCE", MARK_REFERENCE); DELETE_MESSAGE = sqlProperties.getProperty("DELETE_MESSAGE", DELETE_MESSAGE); DELETE_REFERENCE = sqlProperties.getProperty("DELETE_REFERENCE", DELETE_REFERENCE); UPDATE_MESSAGE = sqlProperties.getProperty("UPDATE_MESSAGE", UPDATE_MESSAGE); UPDATE_REFERENCE = sqlProperties.getProperty("UPDATE_REFERENCE", UPDATE_REFERENCE); DELETE_ORPHANED_MESSAGES = sqlProperties.getProperty("DELETE_ORPHANED_MESSAGES", DELETE_ORPHANED_MESSAGES); DELETE_ALL_TXS = sqlProperties.getProperty("DELETE_ALL_TXS", DELETE_ALL_TXS); CREATE_REFERENCE_TABLE = sqlProperties.getProperty("CREATE_REFERENCE_TABLE", CREATE_REFERENCE_TABLE); CREATE_MESSAGE_TABLE = sqlProperties.getProperty("CREATE_MESSAGE_TABLE", CREATE_MESSAGE_TABLE); CREATE_TX_TABLE = sqlProperties.getProperty("CREATE_TX_TABLE", CREATE_TX_TABLE); createTables = sqlProperties.getProperty("CREATE_TABLES_ON_STARTUP", "true").equalsIgnoreCase("true"); String s = sqlProperties.getProperty("BLOB_TYPE", "OBJECT_BLOB"); if (s.equals("OBJECT_BLOB")) blobType = OBJECT_BLOB; else if (s.equals("BYTES_BLOB")) blobType = BYTES_BLOB; else if (s.equals("BINARYSTREAM_BLOB")) blobType = BINARYSTREAM_BLOB; else if (s.equals("BLOB_BLOB")) blobType = BLOB_BLOB; //Find the ConnectionFactoryLoader MBean so we can find the datasource String dsName = (String) getServer().getAttribute(connectionManagerName, "BindName"); //Get an InitialContext InitialContext ctx = new InitialContext(); datasource = (DataSource) ctx.lookup(dsName); //Get the Transaction Manager so we can control the jdbc tx tm = (TransactionManager) ctx.lookup(TransactionManagerService.JNDI_NAME); log.debug("Resolving uncommited TXS"); resolveAllUncommitedTXs(); gcThread = new Thread(this, "JBossMQ persistent message garbage collection"); gcThread.setDaemon(true); gcThread.start(); } protected void stopService() throws Exception { if (gcThread != null) gcThread.interrupt(); gcThread = null; } // Protected -------------------------------------------------------------------- /** * Resolve uncommitted transactions * * @throws JMSException for any error */ protected synchronized void resolveAllUncommitedTXs() throws JMSException { TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; ResultSet rs = null; boolean threadWasInterrupted = Thread.interrupted(); try { if (createTables) { c = this.getConnection(); try { stmt = c.prepareStatement(CREATE_REFERENCE_TABLE); stmt.executeUpdate(); } catch (SQLException e) { log.debug("Could not create table with SQL: " + CREATE_REFERENCE_TABLE + ", got : " + e); } finally { try { if (stmt != null) stmt.close(); } catch (Throwable ignored) { log.trace("Ignored: " + ignored); } stmt = null; } try { stmt = c.prepareStatement(CREATE_MESSAGE_TABLE); stmt.executeUpdate(); } catch (SQLException e) { log.debug("Could not create table with SQL: " + CREATE_MESSAGE_TABLE + ", got : " + e); } finally { try { if (stmt != null) stmt.close(); } catch (Throwable ignored) { log.trace("Ignored: " + ignored); } stmt = null; } try { stmt = c.prepareStatement(CREATE_TX_TABLE); stmt.executeUpdate(); } catch (SQLException e) { log.debug("Could not create table with SQL: " + CREATE_TX_TABLE + ", got : " + e); } finally { try { if (stmt != null) stmt.close(); } catch (Throwable ignored) { log.trace("Ignored: " + ignored); } stmt = null; } } } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not get a connection for jdbc2 table construction ", e); } finally { try { if (stmt != null) stmt.close(); } catch (Throwable ignore) { } stmt = null; try { c.close(); } catch (Throwable ignore) { } c = null; tms.endTX(); // Restore the interrupted state of the thread if( threadWasInterrupted ) Thread.currentThread().interrupt(); } // We perform recovery in a different thread to the table creation // Postgres doesn't like create table failing in the same transaction // as other operations tms = new TransactionManagerStrategy(); tms.startTX(); threadWasInterrupted = Thread.interrupted(); try { c = this.getConnection(); // Delete the temporary messages. stmt = c.prepareStatement(DELETE_TEMPORARY_MESSAGES); stmt.executeUpdate(); stmt.close(); // Delete all the messages that were added but their tx's were not commited. stmt = c.prepareStatement(DELETE_MARKED_MESSAGES_WITH_TX); stmt.setString(1, "A"); stmt.executeUpdate(); stmt.close(); // Restore all the messages that were removed but their tx's were not commited. stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES); stmt.setNull(1, Types.BIGINT); stmt.setString(2, "A"); stmt.setString(3, "D"); stmt.executeUpdate(); stmt.close(); // Delete the temporary references. stmt = c.prepareStatement(DELETE_TEMPORARY_REFERENCES); stmt.executeUpdate(); stmt.close(); // Delete all the references that were added but their tx's were not commited. stmt = c.prepareStatement(DELETE_MARKED_REFERENCES_WITH_TX); stmt.setString(1, "A"); stmt.executeUpdate(); stmt.close(); // Restore all the references that were removed but their tx's were not commited. stmt = c.prepareStatement(UPDATE_MARKED_REFERENCES); stmt.setNull(1, Types.BIGINT); stmt.setString(2, "A"); stmt.setString(3, "D"); stmt.executeUpdate(); stmt.close(); // Remove orphaned messages stmt = c.prepareStatement(DELETE_ORPHANED_MESSAGES); stmt.executeUpdate(); stmt.close(); // Find out what the next TXID should be stmt = c.prepareStatement(SELECT_MAX_TX); rs = stmt.executeQuery(); if (rs.next()) nextTransactionId.set(rs.getLong(1) + 1); rs.close(); stmt.close(); // Delete all transactions. stmt = c.prepareStatement(DELETE_ALL_TXS); stmt.executeUpdate(); stmt.close(); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not resolve uncommited transactions. Message recovery may not be accurate", e); } finally { try { rs.close(); } catch (Throwable ignore) { } try { stmt.close(); } catch (Throwable ignore) { } try { c.close(); } catch (Throwable ignore) { } tms.endTX(); // Restore the interrupted state of the thread if( threadWasInterrupted ) Thread.currentThread().interrupt(); } } /** * Remove a transaction record * * @param c the connection * @param txid the transaction * @throws SQLException for any error */ protected void removeTXRecord(Connection c, long txid) throws SQLException { PreparedStatement stmt = null; try { stmt = c.prepareStatement(DELETE_TX); stmt.setLong(1, txid); stmt.executeUpdate(); } finally { try { stmt.close(); } catch (Throwable e) { } } } /** * Add a message * * @param c the connection * @param queue the queue name * @param message the message * @param txid the transaction id * @param mark the mark to set for the message * @throws SQLException for an error in the db * @throws IOException for an error serializing the message */ protected void addMessage(Connection c, String queue, SpyMessage message, Tx txId, String mark, String lateClone) throws SQLException, IOException { PreparedStatement stmt = null; try { stmt = c.prepareStatement(INSERT_MESSAGE); stmt.setLong(1, message.header.messageId); String dest = "*"; if (queue != null) dest = queue; stmt.setString(2, dest); setBlob(stmt, 3, message); if (txId != null) stmt.setLong(4, txId.longValue()); else stmt.setNull(4, Types.BIGINT); if (mark == null) stmt.setNull(5, Types.VARCHAR); else stmt.setString(5, mark); stmt.setString(6, lateClone); try { stmt.executeUpdate(); } catch (SQLException e) { if (lateClone.equals("1")) log.trace("Assumed already added to message log: " + message.header.messageId); else throw e; } } finally { try { stmt.close(); } catch (Throwable ignore) { } } } /** * Add a reference * * @param c the connection * @param queue the queue name * @param message the reference * @param txid the transaction id * @param mark the mark to set for the message * @throws SQLException for an error in the db * @throws IOException for an error serializing the message */ protected void addReference(Connection c, String queue, MessageReference message, Tx txId, String mark) throws SQLException, IOException { PreparedStatement stmt = null; try { stmt = c.prepareStatement(INSERT_REFERENCE); stmt.setLong(1, message.messageId); stmt.setString(2, queue); if (txId != null) stmt.setLong(3, txId.longValue()); else stmt.setNull(3, Types.BIGINT); stmt.setString(4, mark); if (message.redelivered) stmt.setString(5, "1"); else stmt.setString(5, "0"); stmt.setLong(6, message.redeliveryCount); stmt.executeUpdate(); } finally { try { stmt.close(); } catch (Throwable ignore) { } } } /** * Remove messages for a given transaction and mark * * @param c the connection * @param txid the transaction id * @param mark the mark * @throws SQLException for any error */ protected void removeMarkedMessages(Connection c, Tx txid, String mark) throws SQLException { PreparedStatement stmt = null; try { stmt = c.prepareStatement(DELETE_MARKED_MESSAGES); stmt.setLong(1, txid.longValue()); stmt.setString(2, mark); stmt.executeUpdate(); } finally { try { stmt.close(); } catch (Throwable e) { } } } /** * Remove references for a given transaction and mark * * @param c the connection * @param txid the transaction id * @param mark the mark * @throws SQLException for any error */ protected void removeMarkedReferences(Connection c, Tx txid, String mark) throws SQLException { PreparedStatement stmt = null; try { stmt = c.prepareStatement(DELETE_MARKED_REFERENCES); if (txid != null) stmt.setLong(1, txid.longValue()); else stmt.setNull(1, Types.BIGINT); stmt.setString(2, mark); stmt.executeUpdate(); } finally { try { stmt.close(); } catch (Throwable e) { } } } /** * Store the message in a blob * * @param stmt the prepared statement * @param column the column in the prepared statement * @param message the message * @param IOException for an error serializing the message * @param SQLException for an error accessing the db */ protected void setBlob(PreparedStatement stmt, int column, SpyMessage message) throws IOException, SQLException { if (blobType == OBJECT_BLOB) stmt.setObject(column, message); else if (blobType == BYTES_BLOB) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); SpyMessage.writeMessage(message,oos); oos.flush(); byte[] messageAsBytes = baos.toByteArray(); stmt.setBytes(column, messageAsBytes); } else if (blobType == BINARYSTREAM_BLOB) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); SpyMessage.writeMessage(message,oos); oos.flush(); byte[] messageAsBytes = baos.toByteArray(); ByteArrayInputStream bais = new ByteArrayInputStream(messageAsBytes); stmt.setBinaryStream(column, bais, messageAsBytes.length); } else if (blobType == BLOB_BLOB) { throw new RuntimeException("BLOB_TYPE: BLOB_BLOB is not yet implemented."); /** TODO: ByteArrayOutputStream baos= new ByteArrayOutputStream(); ObjectOutputStream oos= new ObjectOutputStream(baos); oos.writeObject(message); byte[] messageAsBytes= baos.toByteArray(); ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes); stmt.setBsetBinaryStream(column, bais, messageAsBytes.length); */ } } /** * Extract a message from a result * * @param rs the result set * @param column the column number * @return the message * @throws SQLException for an error accessing the db * @throws IOException for an error extracting the message */ protected SpyMessage extractMessage(ResultSet rs, int column) throws SQLException, IOException { long messageid = rs.getLong(1); SpyMessage message = null; if (blobType == OBJECT_BLOB) { message = (SpyMessage) rs.getObject(column); } else if (blobType == BYTES_BLOB) { byte[] st = rs.getBytes(column); ByteArrayInputStream baip = new ByteArrayInputStream(st); ObjectInputStream ois = new ObjectInputStream(baip); message = SpyMessage.readMessage(ois); } else if (blobType == BINARYSTREAM_BLOB) { ObjectInputStream ois = new ObjectInputStream(rs.getBinaryStream(column)); message = SpyMessage.readMessage(ois); } else if (blobType == BLOB_BLOB) { ObjectInputStream ois = new ObjectInputStream(rs.getBlob(column).getBinaryStream()); message = SpyMessage.readMessage(ois); } else throw new IllegalStateException(); message.header.messageId = messageid; return message; } /** * Gets a connection from the datasource, retrying as needed. This was * implemented because in some minimal configurations (i.e. little logging * and few services) the database wasn't ready when we tried to get a * connection. We, therefore, implement a retry loop wich is controled * by the ConnectionRetryAttempts attribute. Submitted by terry@amicas.com * * @throws SQLException if an error occurs. */ protected Connection getConnection() throws SQLException { int attempts = this.connectionRetryAttempts; int attemptCount = 0; SQLException sqlException = null; while (attempts-- > 0) { if (++attemptCount > 1) log.debug("Retrying connection: attempt # " + attemptCount); try { sqlException = null; return datasource.getConnection(); } catch (SQLException exception) { log.debug("Connection attempt # " + attemptCount + " failed with SQLException", exception); sqlException = exception; } finally { if (sqlException == null && attemptCount > 1) log.debug("Connection succeeded on attempt # " + attemptCount); } if (attempts > 0) { try { Thread.sleep(1500); } catch(InterruptedException interruptedException) { break; } } } if (sqlException != null) throw sqlException; throw new SQLException("connection attempt interrupted"); } // Inner Classes ---------------------------------------------------------------- /** * This inner class helps handle the tx management of the jdbc connections. */ class TransactionManagerStrategy { Transaction threadTx; void startTX() throws JMSException { try { // Thread arriving must be clean (jboss doesn't set the thread // previously). However optimized calls come with associated // thread for example. We suspend the thread association here, and // resume in the finally block of the following try. threadTx = tm.suspend(); tm.begin(); } catch (Exception e) { try { if (threadTx != null) tm.resume(threadTx); } catch (Exception ignore) { } throw new SpyJMSException("Could not start a transaction with the transaction manager.", e); } } void setRollbackOnly() throws JMSException { try { tm.setRollbackOnly(); } catch (Exception e) { throw new SpyJMSException("Could not start a mark the transaction for rollback .", e); } } void endTX() throws JMSException { try { if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK) { tm.rollback(); } else { tm.commit(); } } catch (Exception e) { throw new SpyJMSException("Could not start a transaction with the transaction manager.", e); } finally { try { if (threadTx != null) tm.resume(threadTx); } catch (Exception ignore) { } } } } }