4 Replies Latest reply on Sep 24, 2012 11:42 AM by Mircea Markus

    Post-transaction event listeners

    Luca Zenti Newbie

      Hi,

      I'm using Infinispan as a replicated cache (replicated/synchronous mode).

      I need the cluster nodes to be informed when something changes in the cache and I used cache.addListener().

      My problem is that in many cases I need my listeners to get notified when they can actually find the cache in the new state. I already pass the created/changed/removed object to them, but in many circumstances this is only triggering a "refresh" which involves reading the cache using get operations.

      From the FAQ section of the documentation I understand this is not possible (https://docs.jboss.org/author/pages/viewpage.action?pageId=5832860) and I assume it is related with the transactional behaviour of Infinispan (that I'm not using, mostly for performance reasons).

      To better contextualize my question, I'd like the following test case to pass:

       

      public class SampleTest {

         

          private Cache<String, String> cache;

         

          @Listener

          public class MyListener {

              @CacheEntryModified

              public void entryUpdated(CacheEntryModifiedEvent<String, String> event) {

                  if(!event.isPre()) {

                      assertSame(event.getValue(), cache.get(event.getKey()));

                  }

              }

          }

         

          @Before

          public void setup() throws Exception {

              DefaultCacheManager cacheManager = new DefaultCacheManager(GlobalConfigurationBuilder.defaultClusteredBuilder().transport()

                      .build(), new ConfigurationBuilder().clustering().cacheMode(CacheMode.REPL_SYNC).sync().build());

       

              cache = cacheManager.getCache();

          }

       

       

          @Test

          public void must_notify_listeners_after_change_has_been_applied() {

              cache.addListener(new MyListener());

              cache.put("a", "a");

          }

      }

       

      Is there a way to achieve this?

      I already tried with an async listener, but the behaviour is quite random, depending (I guess) on thread interleaving it may work or not.

      I also tried using transactions, my intention was to build a queue of notifications and fire them once I get the transactionComplete event, but this is quite complicated and bug prone and I verified it is not a viable solution because of my performance requirements.

       

       

       

      Thank you very much.

        • 1. Re: Post-transaction event listeners
          Luca Zenti Newbie

          I clicked "marked as assume answered" by mistake...

          Sorry

          • 2. Re: Post-transaction event listeners
            Mircea Markus Master

            You can use a custom interceptor[1] to get the same behavior.

            You'll need:

            - to add it the 2nd in the intercpetor chain

            - override the methods coresponding to events you're interested in, e.g.

             

               @Override

               public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {

                  Object result = invokeNextInterceptor(ctx, command);

                  yourCustomCodeHere();

                  return result;

               }

             

             

               private void yourCustomCodeHere() {

                  assert cache.get("key").equals("value");

               }

             

             

            [1] https://docs.jboss.org/author/display/ISPN/Infinispan+Custom+Interceptors

            • 3. Re: Post-transaction event listeners
              Luca Zenti Newbie

              Hi,

              I found a workaround for this: I used a custom interceptor added to the cache (not to the configuration) as the first in the chain.

              Visiting the relevant commands, this enables me to notify custom listeners that I maintain in a list.

              This is the skeleton of the interceptor:

               

              class PostTransactionEventsNotifier extends CommandInterceptor {

               

                      @SuppressWarnings("unchecked")

                      @Override

                      public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {

                          V oldValue = cache.get(command.getKey());

                          Object result = super.visitPutKeyValueCommand(ctx, command);

                          for (CacheListener<K, V> listener : listeners) {

                              if (oldValue == null)

                                  listener.onObjectAdded((K) command.getKey(), (V) command.getValue());

                              else

                                  listener.onObjectUpdated((K) command.getKey(), (V) command.getValue());

                          }

                          return result;

                      }

               

                      @Override

                      public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable {

                          Object result = super.visitClearCommand(ctx, command);

                          for (CacheListener<K, V> listener : listeners) {

                              listeners.onCacheCleared();

                          }

                          return result;

                      }

               

                      @SuppressWarnings("unchecked")

                      @Override

                      public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {

                          V removed = cache.get(command.getKey());

                          Object result = super.visitRemoveCommand(ctx, command);

                          for (CacheListener<K, V> listener : listeners) {

                              listener.onObjectRemoved((K) command.getKey(), removed);

                          }

                          return result;

                      }

               

                      @SuppressWarnings("unchecked")

                      @Override

                      public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {

                          Object result = super.visitReplaceCommand(ctx, command);

                          if (command.getOldValue() != null) {

                              for (CacheListener<K, V> listener : listeners) {

                                  listener.onObjectAdded((K) command.getKey(), (V) command.getNewValue());

                              }

                          }

                          return result;

                      }

               

                      @SuppressWarnings("unchecked")

                      @Override

                      public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {

                          Map<K, V> addedElements = new HashMap<K, V>();

                          Map<K, V> updatedElements = new HashMap<K, V>();

                          for (Map.Entry<Object, Object> entry : command.getMap().entrySet()) {

                              if (cache.containsKey(entry.getKey()))

                                  updatedElements.put((K) entry.getKey(), (V) entry.getValue());

                              else

                                  addedElements.put((K) entry.getKey(), (V) entry.getValue());

                          }

               

                          Object result = super.visitPutMapCommand(ctx, command);

                         

                          for (Map.Entry<K, V> added : addedElements.entrySet()) {

                              for (CacheListener<K, V> listener : listeners) {

                                  listener.onObjectAdded(added.getKey(), added.getValue());

                              }

                          }

                         

                          for (Map.Entry<K, V> updated : updatedElements.entrySet()) {

                              for (CacheListener<K, V> listener : listeners) {

                                  listener.onObjectUpdated(updated.getKey(), updated.getValue());

                              }

                          }

                          return result;

                      }

                  }

               

              To add the interceptor:

               

              cache.getAdvancedCache().addInterceptor(new PostTransactionEventsNotifier(), 0);

               

              Depending on your configuration, you may need to add the interceptor in a different position, but it must be as close as possible to the begininning of the chain to ensure the cache is fully updated and committed when the execution returns from the rest of the chain.

               

              Note: the implementation of the visitPutMapCommand method is a little bit complicated because I need to distinguish between "objectAdded" and "objectUpdated" events in all cases.

               

              Cheers.