8 Replies Latest reply on Feb 8, 2013 6:24 AM by paulpa63

    Infinispan Indexing Problem - Transaction Manager

    paulpa63 Newbie

      Hi,

      We are running Infinispan 5.1.5.FINAL and the following problem has been observed on RHEL5 amd Windows 7.

      In our test scenario we have a cluster of two nodes. The nodes are configured to use RAM indexing (with indexLocalOnly=false). We start both nodes so that the cluster is seen to form and then load data at one node. The JMX allows us to determine that caches on both nodes have been properly loaded. However, if a transaction manager is specified in the default settings (either dummy or standalone) then only the node at which the data load was performed gets any index entries!? This problem goes away if the transaction manager specification is removed. It is believed that this problem will occur for all types of index: ram, filesystem and/or cluster. The fact that this problem also occurs when using the dummy transaction manager suggests that it is not due to mis-configuration of transaction manager.

      (Note that the internal transaction handling within our server will be performed using a Spring aspect according to patttern described in "http://stackoverflow.com/questions/10624975" but the problem outlined above is observed when the aspect is inactive.)

      I notice that there was a question in 2010 about the inclusion of index operations in transactions.

      (It is a pity that the Infinispan JMX monitoring does not extend to index monitoring).

      Paul

       

      Also see unresolved discussion issue https://community.jboss.org/message/736543#736543#736543

        • 1. Re: Infinispan Indexing Problem - Transaction Manager
          paulpa63 Newbie

          I have rolled-up my sleeves, stoked-up the Eclipse debugger and spent an educational day stepping through Infinispan and Hibernate Search in order to more fully diagnose this problem.  As noted in the referenced discussion issue, the problem is due to the non-inclusion of index updates when operating within a transactional environment.  More specifically it concerns the QueryInterceptor of Hibernate Search which does not include overrides for the visitPrepareCommand and visitCommitCommand, the visitor callbacks which are invoked when running with indexing enabled under a transactional environment.  (This interceptor does of course include the simple overrides for visitPutKeyValueCommand, etc which are invoked when running outside a transactional environment, and which illustrate the necessary index management operations.)  I have created a simple patch for QueryInterceptor in which visitPrepareCommand and visitCommitCommand have been defined and together coordinate the equivalent actions from the visitPutKeyValueCommand, etc., depending on the specific cache command wrapped within the prepare command.  This patch appears to solve our problem and we have included it in our developed system (which will actually utilise file indexes for deployment).

           

          (Note if a user is running Infinispan with indexing and transactions enabled then I think this problem may be avoided if the index is a shared file index [single point of failure] or a clustered index [slightly complicated configuration].)

           

          Apologies for the loss of formatting, please copy this code into Eclipse and hit ctrl-shift-f.  Also, note import list has been ommitted for compactness.

           

          Paul

           

          ------------------------------------------------------------------------

           

          public class QueryInterceptor extends CommandInterceptor {

          private final SearchFactoryIntegrator searchFactory;

          private final ConcurrentMap<Class<?>, Class<?>> knownClasses = ConcurrentMapFactory.makeConcurrentMap();

          private final Lock mutating = new ReentrantLock();

          private final KeyTransformationHandler keyTransformationHandler = new KeyTransformationHandler();

          protected TransactionManager transactionManager;

          protected TransactionSynchronizationRegistry transactionSynchronizationRegistry;

          protected ExecutorService asyncExecutor;

          private static final Log log = LogFactory.getLog(QueryInterceptor.class, Log.class);

          @Override

          protected Log getLog() {

          return log;

          }

          public QueryInterceptor(SearchFactoryIntegrator searchFactory) {

          this.searchFactory = searchFactory;

          }

          @Inject

          public void injectDependencies(@ComponentName(KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR) ExecutorService e) {

          this.asyncExecutor = e;

          }

          protected boolean shouldModifyIndexes(InvocationContext ctx) {

          return !ctx.hasFlag(Flag.SKIP_INDEXING);

          }

          /**

          * Use this executor for Async operations

          *

          * @return

          */

          public ExecutorService getAsyncExecutor() {

          return asyncExecutor;

          }

          @Override

          public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {

          // This method will get the put() calls on the cache and then send them

          // into Lucene once it's successful.

          // do the actual put first.

          Object toReturn = invokeNextInterceptor(ctx, command);

          if (shouldModifyIndexes(ctx)) {

          processPutKeyValueCommand(command);

          }

          return toReturn;

          }

          private void processPutKeyValueCommand(PutKeyValueCommand command) {

          // First making a check to see if the key is already in the cache or

          // not. If it isn't we can add the key no problem,

          // otherwise we need to be updating the indexes as opposed to simply

          // adding to the indexes.

          Object key = command.getKey();

          Object value = extractValue(command.getValue());

          updateKnownTypesIfNeeded(value);

          // This means that the entry is just modified so we need to update

          // the indexes and not add to them.

          getLog().debug("Entry is changed");

          updateIndexes(value, extractValue(key));

          }

          @Override

          public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {

          // remove the object out of the cache first.

          Object valueRemoved = invokeNextInterceptor(ctx, command);

          if (shouldModifyIndexes(ctx)) {

          processRemoveCommand(command, valueRemoved);

          }

          return valueRemoved;

          }

          private void processRemoveCommand(RemoveCommand command, Object valueRemoved) {

          if (command.isSuccessful() && !command.isNonExistent()) {

          Object value = extractValue(valueRemoved);

          updateKnownTypesIfNeeded(value);

          removeFromIndexes(value, extractValue(command.getKey()));

          }

          }

          @Override

          public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {

          Object valueReplaced = invokeNextInterceptor(ctx, command);

          if (shouldModifyIndexes(ctx)) {

          processReplaceCommand(command, valueReplaced);

          }

          return valueReplaced;

          }

          private void processReplaceCommand(ReplaceCommand command, Object valueReplaced) {

          if (valueReplaced != null) {

          Object[] parameters = command.getParameters();

          Object p1 = extractValue(parameters[1]);

          Object p2 = extractValue(parameters[2]);

          updateKnownTypesIfNeeded(p1);

          updateKnownTypesIfNeeded(p2);

          Object key = extractValue(command.getKey());

          if (p1 != null) {

          removeFromIndexes(p1, key);

          }

          updateIndexes(p2, key);

          }

          }

          @Override

          public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {

          Object mapPut = invokeNextInterceptor(ctx, command);

          if (shouldModifyIndexes(ctx)) {

          processPutMapCommand(command);

          }

          return mapPut;

          }

          private void processPutMapCommand(PutMapCommand command) {

          Map<Object, Object> dataMap = command.getMap();

          // Loop through all the keys and put those key, value pairings into

          // lucene.

          for (Map.Entry<Object, Object> entry : dataMap.entrySet()) {

          Object value = extractValue(entry.getValue());

          updateKnownTypesIfNeeded(value);

          updateIndexes(value, extractValue(entry.getKey()));

          }

          }

          @Override

          public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {

          // This method is called when somebody calls a cache.clear() and we will

          // need to wipe everything in the indexes.

          Object returnValue = invokeNextInterceptor(ctx, command);

          if (shouldModifyIndexes(ctx)) {

          processClearCommand();

          }

          return returnValue;

          }

          private void processClearCommand() {

          if (getLog().isTraceEnabled())

          getLog().trace("shouldModifyIndexes() is true and we can clear the indexes");

          for (Class c : this.knownClasses.keySet()) {

          EntityIndexBinder binder = this.searchFactory.getIndexBindingForEntity(c);

          if (binder != null) { // check as not all known classes are

          // indexed

          searchFactory.getWorker().performWork(new Work<Object>(c, (Serializable) null, WorkType.PURGE_ALL),

          new TransactionalEventTransactionContext(transactionManager, transactionSynchronizationRegistry));

          }

          }

          }

          // Method that will be called when data needs to be removed from Lucene.

          protected void removeFromIndexes(Object value, Object key) {

          // The key here is the String representation of the key that is stored

          // in the cache.

          // The key is going to be the documentID for Lucene.

          // The object parameter is the actual value that needs to be removed

          // from lucene.

          if (value == null)

          throw new NullPointerException("Cannot handle a null value!");

          TransactionContext transactionContext = new TransactionalEventTransactionContext(transactionManager,

          transactionSynchronizationRegistry);

          searchFactory.getWorker().performWork(new Work<Object>(value, keyToString(key), WorkType.DELETE), transactionContext);

          }

          protected void updateIndexes(Object value, Object key) {

          // The key here is the String representation of the key that is stored

          // in the cache.

          // The key is going to be the documentID for Lucene.

          // The object parameter is the actual value that needs to be removed

          // from lucene.

          if (value == null)

          throw new NullPointerException("Cannot handle a null value!");

          TransactionContext transactionContext = new TransactionalEventTransactionContext(transactionManager,

          transactionSynchronizationRegistry);

          searchFactory.getWorker().performWork(new Work<Object>(value, keyToString(key), WorkType.UPDATE), transactionContext);

          }

          private Object extractValue(Object wrappedValue) {

          if (wrappedValue instanceof MarshalledValue)

          return ((MarshalledValue) wrappedValue).get();

          else

          return wrappedValue;

          }

          public void enableClasses(Class<?>[] classes) {

          if (classes == null || classes.length == 0) {

          return;

          }

          enableClassesIncrementally(classes, false);

          }

          private void enableClassesIncrementally(Class<?>[] classes, boolean locked) {

          ArrayList<Class<?>> toAdd = null;

          for (Class<?> type : classes) {

          if (!knownClasses.containsValue(type)) {

          if (toAdd == null)

          toAdd = new ArrayList<Class<?>>(classes.length);

          toAdd.add(type);

          }

          }

          if (toAdd == null) {

          return;

          }

          if (locked) {

          Set<Class<?>> existingClasses = knownClasses.keySet();

          int index = existingClasses.size();

          Class<?>[] all = existingClasses.toArray(new Class[existingClasses.size() + toAdd.size()]);

          for (Class<?> toAddClass : toAdd) {

          all[index++] = toAddClass;

          }

          searchFactory.addClasses(all);

          for (Class<?> type : toAdd) {

          knownClasses.put(type, type);

          }

          } else {

          mutating.lock();

          try {

          enableClassesIncrementally(classes, true);

          } finally {

          mutating.unlock();

          }

          }

          }

          private void updateKnownTypesIfNeeded(Object value) {

          if (value != null) {

          Class<?> potentialNewType = value.getClass();

          if (!this.knownClasses.containsValue(potentialNewType)) {

          mutating.lock();

          try {

          enableClassesIncrementally(new Class[] { potentialNewType }, true);

          } finally {

          mutating.unlock();

          }

          }

          }

          }

          public void registerKeyTransformer(Class<?> keyClass, Class<? extends Transformer> transformerClass) {

          keyTransformationHandler.registerTransformer(keyClass, transformerClass);

          }

          private String keyToString(Object key) {

          return keyTransformationHandler.keyToString(key);

          }

          public KeyTransformationHandler getKeyTransformationHandler() {

          return keyTransformationHandler;

          }

          /*

          * This map is used to store write commands containing data that needs to be applied to the index during commit.

          * TODO may be this should be a weak/soft hash map?

          */

          private Map<Long, WriteCommand[]> preparedWriteCommands = Collections.synchronizedMap(new HashMap<Long, WriteCommand[]>());

          @Override

          public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {

          final Object toReturn = super.visitPrepareCommand(ctx, command);

          preparedWriteCommands.put(ctx.getGlobalTransaction().getId(), command.getModifications());

          return toReturn;

          }

          @Override

          public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {

          final Object toReturn = invokeNextInterceptor(ctx, command);

          if (shouldModifyIndexes(ctx)) {

          final WriteCommand[] writeCommands = preparedWriteCommands.get(ctx.getGlobalTransaction().getId());

          if (writeCommands != null) {

          try {

          for (WriteCommand writeCommand : writeCommands) {

          if (writeCommand instanceof ClearCommand) {

          processClearCommand();

          } else if (writeCommand instanceof PutKeyValueCommand) {

          processPutKeyValueCommand((PutKeyValueCommand) writeCommand);

          } else if (writeCommand instanceof PutMapCommand) {

          processPutMapCommand((PutMapCommand) writeCommand);

          } else if (writeCommand instanceof RemoveCommand) {

          processRemoveCommand((RemoveCommand) writeCommand, toReturn);

          } else if (writeCommand instanceof ReplaceCommand) {

          processReplaceCommand((ReplaceCommand) writeCommand, toReturn);

          }

          }

          } catch (Throwable t) {

          throw t;

          } finally {

          preparedWriteCommands.remove(ctx.getGlobalTransaction().getId());

          }

          }

          }

          return toReturn;

          }

          }

          • 2. Re: Infinispan Indexing Problem - Transaction Manager
            paulpa63 Newbie

            Some further information related to this patch.  I have detected a problem when attempting to commit a transaction involving cache deletion/removal whereby an NPE is generated by the existing QueryInterceptor.removeFromIndexes because it is passed a RemoveCommand with a null value.  (A similar problem will also affect cache replacements.)  The patch has been modified to handle these cases by saving the return (object) values within visitRemoveCommand and visitReplaceCommand in a map against the command, so that the (object) value can be retrieved when processing the same remove/replace command during the visitCommitCommand method.

             

            Note: The patch has been furhter modified to include a check on shouldModifyIndexes within visitPrepareCommand, and by adding another override for visitRollbackCommand which removes the previously prepared command (cf. finally block in visitCommitCommand).

             

            Paul

            • 3. Re: Infinispan Indexing Problem - Transaction Manager
              Sanne Grinovero Master

              Hi Paul,

              thank you so much for looking into this, and sorry for the delay. I'm guilty of having ignored this forum for a while.. my fault! Still please make sure to report issues on JIRA that's where we track patches to apply.

               

              Related issues:

              https://issues.jboss.org/browse/ISPN-2351

              https://issues.jboss.org/browse/ISPN-1884

              https://issues.jboss.org/browse/ISPN-2467

              • 4. Re: Infinispan Indexing Problem - Transaction Manager
                Sanne Grinovero Master

                Hi Paul, I had a look into your patch but made many changes as for example the return type from the prepare() operation is not the returned objects we need to cleanup previous index operations.

                Also with a proper TransactionManager it's illegal to add a TX Synch during the commit phase: I moved all work preparation in the prepare phase and let it register as a synch, so the concurrent map isn't actually needed.

                 

                I didn't fully understand your other comments about the NPE, could you please checkout this and see how it works for you?

                 

                https://github.com/infinispan/infinispan/pull/1452

                 

                For anything else you might need, please open JIRAs. If you can make them, unit tests help a lot! And fixes like the above would be easier to apply as patch files (or just using GitHub)

                • 5. Re: Infinispan Indexing Problem - Transaction Manager
                  paulpa63 Newbie

                  Hi Sanne,

                   

                  Thanks for taking a look at my patch, I am not surprised that it required rework as transaction programming is not my field of experience/knowledge.  I will download your version and give it a test in the next day or so.  My comments about the NPE may not be significant for your version - I will see if it handles cache remove/delete operations.

                   

                  Paul

                   

                  And JUnit is a wonderful thing!

                  • 6. Re: Infinispan Indexing Problem - Transaction Manager
                    Sanne Grinovero Master

                    Hi Paul,

                    my patch was merged in master so you could test master easily rather than applying my patch. Thanks!

                     

                    Sanne

                    • 7. Re: Infinispan Indexing Problem - Transaction Manager
                      paulpa63 Newbie

                      Hi Sanne,

                      Just a quick update.  I tried applying your two patch files QueryInterceptor and LocalQueryInterceptor within our current Infinispan 5.1.5 development environment but encountered an exception on server startup.  I suspect that, as you suggest, I need to be working with the latest Infinispan master from GitHub.  This is going to take more time for us to organise (e.g. we do not have the luxury of seemless access to the public internet from our development environment).  It is possible that we may delay re-testing until this patch is present in the next Infinispan release.  I know this is not ideal as I would also like to test the patch as soon as possible - it is an important function for our system and you have obviously put in a lot of work on the new patch.

                      Thanks,

                      Paul

                      • 8. Re: Infinispan Indexing Problem - Transaction Manager
                        paulpa63 Newbie

                        Hi Sanne,

                         

                        I have recently re-tested the problem scenario described in my original post within this discussion using Infinispan 5.2.0.CR1 and can confirm that your patches have cured the problem.

                         

                        Thanks again for your assistance,

                         

                        Paul

                         

                        PS For those who may be interested... the patch obviously affects the writing of indexes in relation to transactions in a very general way.  I found that the patch broke a specific part of our application where we were trying to read the index expecting to "see" entries for objects created immediately before - within the same transaction - but the objects were not seen because the index is now only ever updated on completion of a transaction.  This required a small amount of re-coding within our application.