oracle blob bug exposed in org.jboss.mq.pm.jdbc3.Persistence
sysuser1 Jan 5, 2005 2:11 AMHey everyone,
For anyone who cares and is trying to use Oracle to persist your jms messages with JBossMQ in jboss-3.2.5 we found a couple of problems in org.jboss.mq.pm.jdbc3.PersistenceManager when using the oracle-jdbc3-service.xml:
These aren't really problems with the PersistenceManager as everything works just fine under mysql for as much testing as we had done, but our requirement is to use Oracle for this case, and it seems that Oracle's bug is that it forces you to do a "select for update" when you're trying to retrieve Blobs greater than size 4k using the thin driver. I'm not really sure why Oracle forces developers to write extra code in order to do this??? Suffice it to say... I'm calling it a bug! ;)
(1) We had problems storing messages of type ObjectMessage that were larger than 4k.
Granted you want to keep your ObjectMessage sizes short and concise, but we have some messages where the 4k limit just doesn't cut it for us.
(2) As far as I can tell the jms-1.1 spec allows null ObjectMessages (Section 3.12 Provider Implementations of JMS Message Interfaces - pg. 51). In this case if you try and store a null ObjectMessage on a queue, then the PersistenceManager.extractMessage(ResultSet, int) code throws a NullPointerException when the jms server starts up trying to peek ahead on the InputStream.
That said, we wrote some code extending PersistenceManager.java in order to get by these issues... we're still testing it, but if you're experiencing something similar the same might help you. So in the spirit of open-source...
/*
 * JBossMQ, the OpenSource JMS implementation
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.mq.pm.jdbc3;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import javax.jms.JMSException;
import org.jboss.mq.MessagePool;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.pm.Tx;
import org.jboss.mq.pm.jdbc3.PersistenceManager;
/**
 * This class manages all persistence related services for JDBC based
 * persistence fixing an issue in regards to the jboss PersistenceManager for
 * Oracle when inserting blobs larger than 4K. Also fixes the issue of being
 * able to store null ObjectMessages on the queue.
 *
 * @author tradebeam.com
 */
public class OracleBlobPersistenceManager extends PersistenceManager {
 String INSERT_EMPTY_MESSAGE = "INSERT INTO JMS_MESSAGE_LOG (MESSAGEID," +
 " DESTINATION, MESSAGEBLOB, TXID, TXOP, LATECLONE) VALUES" +
 " (?,?,EMPTY_BLOB(),?,?,?)";
 String SELECT_BLOB_FOR_UPDATE = "SELECT MESSAGEID, MESSAGEBLOB FROM" +
 " JMS_MESSAGE_LOG WHERE messageid = ? FOR UPDATE";
 String UPDATE_BLOB = "UPDATE JMS_MESSAGE_LOG SET MESSAGEBLOB = ? WHERE" +
 " MESSAGEID = ?";
 /**
 * Create a new Persistence Manager.
 *
 * @throws JMSException
 */
 public OracleBlobPersistenceManager () throws JMSException {
 super();
 }
 /**
 * Add a message.
 *
 * @param c The connection.
 * @param queue The queue name.
 * @param message the message.
 * @param txId The transaction id.
 * @param mark The mark to set for the message.
 * @param lateClone
 *
 * @throws SQLException For an error in the db.
 * @throws IOException For an error serializing the message.
 */
 protected void addMessage (Connection c, String queue, SpyMessage message,
 Tx txId, String mark, String lateClone)
 throws SQLException, IOException {
 PreparedStatement stmt = null;
 try {
 stmt = c.prepareStatement(INSERT_EMPTY_MESSAGE);
 stmt.setLong(1, message.header.messageId);
 String dest = "*";
 if (queue != null) {
 dest = queue;
 }
 stmt.setString(2, dest);
 // need to change the following for oracle in order to create the
 // blob initially as an EMPTY_BLOB()
 if (txId != null) {
 stmt.setLong(3, txId.longValue());
 } else {
 stmt.setNull(3, Types.BIGINT);
 }
 if (mark == null) {
 stmt.setNull(4, Types.VARCHAR);
 } else {
 stmt.setString(4, mark);
 }
 stmt.setString(5, lateClone);
 try {
 stmt.executeUpdate();
 // need to add the following for persisting in Oracle
 stmt.close();
 // the following is specifically required for blobs of size
 // greater than 4k
 stmt = c.prepareStatement(SELECT_BLOB_FOR_UPDATE);
 stmt.setLong(1, message.header.messageId);
 stmt.executeQuery();
 stmt.close();
 stmt = c.prepareStatement(UPDATE_BLOB);
 this.setBlob(stmt, 1, message);
 stmt.setLong(2, message.header.messageId);
 stmt.executeUpdate();
 } catch (SQLException sqle) {
 if (lateClone.equals("1")) {
 log.trace("Assumed already added to message log: " +
 message.header.messageId);
 } else {
 throw sqle;
 }
 }
 } finally {
 try {
 stmt.close();
 } catch (Throwable ignore) {
 log.warn("Problem while closing the PreparedStatement for" +
 " message: " + message.header.messageId);
 }
 }
 }
 /**
 * Extract a message from a result. If the message is null then return a
 * default SpyMessage from the MessagePool.
 *
 * @param rs the result set
 * @param column the column number
 * @return the message
 * @throws SQLException for an error accessing the db
 * @throws IOException for an error extracting the message
 * @throws NullPointerException in the case when we receive unexpected data
 */
 protected SpyMessage extractMessage (ResultSet rs, int column)
 throws SQLException, IOException {
 long messageid = 0;
 // this extra check should perserve the behavior of the super class so
 // that we ensure a NullPointerException is thrown here if we receive
 // bad data
 if (rs.getObject(1) != null) {
 messageid = rs.getLong(1);
 } else {
 log.error("ResultSet messageid column was null.");
 throw new NullPointerException("ResultSet messageid column was null.");
 }
 SpyMessage message = null;
 if (blobType == OBJECT_BLOB) {
 Object o = rs.getObject(column);
 if (o != null) {
 message = (SpyMessage) o;
 } else {
 // set the message to a default SpyMessage
 message = MessagePool.getMessage();
 }
 } else if (blobType == BYTES_BLOB) {
 byte[] st = rs.getBytes(column);
 ByteArrayInputStream baip = new ByteArrayInputStream(st);
 if (baip != null) {
 ObjectInputStream ois = new ObjectInputStream(baip);
 message = SpyMessage.readMessage(ois);
 } else {
 // set the message to a default SpyMessage
 message = MessagePool.getMessage();
 }
 } else if (blobType == BINARYSTREAM_BLOB) {
 InputStream in = rs.getBinaryStream(column);
 if (in != null) {
 ObjectInputStream ois = new ObjectInputStream(in);
 message = SpyMessage.readMessage(ois);
 } else {
 // set the message to a default SpyMessage
 message = MessagePool.getMessage();
 }
 } else if (blobType == BLOB_BLOB) {
 Blob b = rs.getBlob(column);
 if (b != null) {
 ObjectInputStream ois = new ObjectInputStream(b.getBinaryStream());
 message = SpyMessage.readMessage(ois);
 } else {
 // set the message to a default SpyMessage
 message = MessagePool.getMessage();
 }
 }
 message.header.messageId = messageid;
 return message;
 }
}
Cheers,
-Jason
 
     
     
    