4 Replies Latest reply on Sep 21, 2011 12:15 PM by garytully

    persistent message storage using Oracle

    hlobilova

      Using 5.5. broker, I am trying to use Oracle JDBC for message persistence.  Tables were created fine, however when a message is being stored there is an exception:

      JDBC Failure: Missing IN or OUT parameter at index:: 7 org.apache.activemq.store.jdbc.JDBCPersistenceAdapter

      java.sql.SQLException: Missing IN or OUT parameter at index:: 7

      at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:112)

      at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:146)

      at oracle.jdbc.driver.OraclePreparedStatement.processCompletedBindRow(OraclePreparedStatement.java:1681)

      at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3280)

      at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(OraclePreparedStatement.java:3368)

      at org.apache.commons.dbcp.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:102)

      at org.apache.commons.dbcp.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:102)

      at org.apache.activemq.store.jdbc.adapter.BlobJDBCAdapter.doAddMessage(BlobJDBCAdapter.java:71)

      at org.apache.activemq.store.jdbc.JDBCMessageStore.addMessage(JDBCMessageStore.java:104)

      at org.apache.activemq.store.memory.MemoryTransactionStore.addMessage(MemoryTransactionStore.java:285)

      at org.apache.activemq.store.memory.MemoryTransactionStore$1.asyncAddQueueMessage(MemoryTransactionStore.java:139)

      at org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:679)

      at org.apache.activemq.broker.region.Queue.send(Queue.java:652)

       

      getAddMessageStatement in Stamements.java creates statement with 7 parameters, but doAddMessage in BlobJDBCAdapter only populates 6.

      How is this supposed to work?

        • 1. Re: persistent message storage using Oracle
          garytully

          There is  a problem with the BlobAdapter in 5.5, but the default jdbc adapter works fine with the latest oracle drivers (ojdbc6.jar) as they deal with blobs under the hood once a string exceeds a defined length.

           

          To enable the default persistence adapter use the following configuration:

          <persistenceAdapter>
                 <jdbcPersistenceAdapter dataSource="#oracle-ds">
                  <adapter>
                       <defaultJDBCAdapter></defaultJDBCAdapter>
                  </adapter>
                 </jdbcPersistenceAdapter>
              </persistenceAdapter>

           

          • 2. Re: persistent message storage using Oracle
            hlobilova

            Thanks that worked!

            • 3. Re: persistent message storage using Oracle
              wmcdonald

              This does not work.

               

              My oracle database version is: Oracle Database 10g Enterprise Edition Release 10.2.0.4.0 - 64bi

               

              When I use the defaultJDBCAdapter it won't create the tables because it uses the BIGINT type which doesn't exist in Oracle.  If I use the OracleJDBCAdapter, it will create the tables, but then BlobJDBCAdpater has a problem with the 7th parameter missing.  Even adding s.setString(7, " "); doesn't fix it - Oracle complains that the type is not a blob; bad hex value.  If I change this to an empty blob object, it still doesn't work.  If I change it to an empty_blob() in the Statements class, it still won't fill the blob.  A null blob doesn't work either.  setBlob(7, InputStream) doesn't work either since the InputStream methods are not implemented in the underlying class.  I've tried combinations of ojdbc14.jar and ojdbc6.jar with the above without success.  When trying to set the blob contents I've gotten errors stating that the row wasn't locked.

               

              What did finally work was the following...

               

              Added this to OracleJDBCAdapter.setStatements(...)

              ...

                      String addMessageStatement = "INSERT INTO "

                          + statements.getFullMessageTableName()

                          + "(ID, MSGID_PROD, MSGID_SEQ, CONTAINER, EXPIRATION, PRIORITY, MSG) VALUES (?, ?, ?, ?, ?, ?, empty_blob())";

                      statements.setAddMessageStatement(addMessageStatement);

                       

                      String findMessageByIdStatement = "SELECT MSG FROM " +

                           statements.getFullMessageTableName() + " WHERE ID=? FOR UPDATE";

                      statements.setFindMessageByIdStatement(findMessageByIdStatement);

               

               

               

              Then modified the following in BlobJDBCAdapter.doAddMessage(...)

                            ....

                            s.setLong(6, priority);

                         

                          if (s.executeUpdate() != 1) {

                              throw new IOException("Failed to add broker message: " + messageID + " in container.");

                          }

                          s.close();

               

                          // Select the blob record so that we can update it.

                          s = c.getConnection().prepareStatement(statements.getFindMessageByIdStatement(),

                                    ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);

                          s.setLong(1, sequence);

                          rs = s.executeQuery();

                          if (!rs.next()) {

                              throw new IOException("Failed select blob for message: " + messageID + " in container.");

                          }

               

                          // Update the blob

                          Blob blob = rs.getBlob(1);

                          blob.truncate(0);

                          blob.setBytes(1, data);

                          rs.updateBlob(1, blob);

                          rs.updateRow();

               

              //            // Update the row with the updated blob

              //s=c.getConnection().prepareStatement(statements.getUpdateMessageStatement());

              //            s.setBlob(1, blob);

              //            s.setLong(2, sequence);

               

               

              This worked for me.  Perhaps you can confirm this.

               

              I'm still trying to figure out why kahadb restores the queues on restart of the broker, but the oracle store doesn't unless you manually recreate the queue.  But that's a different thread...

              • 4. Re: persistent message storage using Oracle
                garytully

                William, your patch looks good. I have committed your changes, see: https://issues.apache.org/jira/browse/AMQ-3289

                It would be great if you or others could validate the changes against your oracle versions.