6 Replies Latest reply on May 30, 2013 5:09 PM by seacuke23

    Transactions for Infinispan as 2LC for Hibernate

    seacuke23

      I've been trying to use Infinispan as my second level cache provder for Hibernate.  I'm using JBoss 7.1.3.final.  My application uses JMS to send messages internally.  I modify an entity and send a message on one of the queues in a single transaction using  the JmsXA connection pool.  When the MDB on the other side of the queue receives the message I need to ensure that the transaction is complete and the cache is accurate.  I have been trying a number of different sites I've found to make Infinispan participate in the current transaction, but have not been able to do this successfully.

       

      Most recently I've been trying this:

       

      https://docs.jboss.org/author/display/AS71/JPA+Reference+Guide

       

      but have also tried this:

       

      https://docs.jboss.org/author/display/ISPN/Using+Infinispan+as+JPA-Hibernate+Second+Level+Cache+Provider

       

      ...among others.

       

      I've written a simple app to try to recreate what I'm doing and it seems to encounter the same problems.  If I can get this app working I suspect my application problems will be resolved in the same way.  I have a startup singleton that schedules a number of asynchronous calls to persist an entity, put its ID on a queue. Amdb receives the message, looks up the entity, modifies it and puts its ID on another queue.  Bmdb receives the message, looks up the entity and confirms that the value is as Amdb should have set it.  I put sleeps before the queue writes to confirm that the messages aren't being put on the queue before the transaction is complete.

       

      package com.testpackage;
      
      import javax.annotation.PostConstruct;
      import javax.annotation.Resource;
      import javax.ejb.EJB;
      import javax.ejb.Singleton;
      import javax.ejb.Startup;
      import javax.ejb.Timeout;
      import javax.ejb.Timer;
      import javax.ejb.TimerConfig;
      import javax.ejb.TimerService;
      import org.apache.log4j.Logger;
      
      @Singleton
      @Startup
      public class StartupBean {
                @Resource
                TimerService timer;
                Logger l = Logger.getLogger(StartupBean.class);
                @EJB
                Launcher launcher;
                private long totalCalls = 0;
                private long failures = 0;
      
                @PostConstruct
                public void setupTimer() {
                          timer.createIntervalTimer(10000, 10000, new TimerConfig(null, false));
                }
      
                public void reportCompletion(boolean success) {
                          if (!success) {
                                    failures++;
                          }
                          totalCalls++;
                }
      
                @Timeout
                public void timeout(Timer t) {
                          l.debug("--------------------- Tallies - " + totalCalls + " calls, " + failures + " failures");
                          for (int i = 0; i < 10; i++) {
                                    launcher.doIt();
                          }
                }
      }
      

       

       

       

      package com.testpackage;
      
      import javax.annotation.Resource;
      import javax.ejb.Asynchronous;
      import javax.ejb.Stateless;
      import javax.jms.ConnectionFactory;
      import javax.jms.Queue;
      import javax.persistence.EntityManager;
      import javax.persistence.PersistenceContext;
      import org.apache.log4j.Logger;
      import com.testpackage.entity.MyEntity;
      
      @Stateless
      public class Launcher {
                @Resource(lookup = "java:/JmsXA")
                private ConnectionFactory cf;
                @Resource(lookup = "java:/queue/testa")
                private Queue queue;
                @PersistenceContext(name = "TestPersistenceUnit")
                EntityManager em;
                private static final Logger l = Logger.getLogger(Launcher.class);
      
                @Asynchronous
                public void doIt() {
                          MyEntity m = new MyEntity();
                          em.persist(m);
                          l.debug("Persisted " + m + " calling Launcher.");
                          try {
                                    MessageSender.sendMessage(cf, queue, String.valueOf(m.getMyId()));
                                    Thread.sleep(2000);
                          }
                          catch (Throwable t) {
                                    l.error("boom", t);
                          }
                }
      }
      

       

      package com.testpackage;
      
      import javax.annotation.Resource;
      import javax.ejb.ActivationConfigProperty;
      import javax.ejb.MessageDriven;
      import javax.jms.ConnectionFactory;
      import javax.jms.Message;
      import javax.jms.MessageListener;
      import javax.jms.Queue;
      import javax.jms.TextMessage;
      import javax.persistence.EntityManager;
      import javax.persistence.PersistenceContext;
      import org.apache.log4j.Logger;
      import com.testpackage.entity.MyEntity;
      
      @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                          @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testa") })
      public class Amdb implements MessageListener {
      
                private static final Logger l = Logger.getLogger(Amdb.class);
                @PersistenceContext(name = "TestPersistenceUnit")
                EntityManager em;
                @Resource(lookup = "java:/JmsXA")
                private ConnectionFactory cf;
                @Resource(lookup = "java:/queue/testb")
                private Queue queue;
      
                public void onMessage(Message message) {
                          try {
                                    long id = Long.parseLong(((TextMessage) message).getText());
                                    MyEntity m = em.find(MyEntity.class, id);
                                    l.debug("Looking for " + id + " found entity " + m);
                                    m.setVal(100);
                                    MessageSender.sendMessage(cf, queue, String.valueOf(id));
                                    Thread.sleep(2000);
                          }
                          catch (Throwable t) {
                                    l.error("boom", t);
                          }
                }
      }
      

       

       

      package com.testpackage;
      
      import javax.ejb.ActivationConfigProperty;
      import javax.ejb.EJB;
      import javax.ejb.MessageDriven;
      import javax.jms.Message;
      import javax.jms.MessageListener;
      import javax.jms.TextMessage;
      import javax.persistence.EntityManager;
      import javax.persistence.PersistenceContext;
      import org.apache.log4j.Logger;
      import com.testpackage.entity.MyEntity;
      
      @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
                          @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testb") })
      public class Bmdb implements MessageListener {
                private static final Logger l = Logger.getLogger(Bmdb.class);
                @PersistenceContext(name = "TestPersistenceUnit")
                EntityManager em;
                @EJB
                StartupBean sb;
      
                public void onMessage(Message message) {
                          try {
                                    long id = Long.parseLong(((TextMessage) message).getText());
                                    MyEntity m = em.find(MyEntity.class, id);
                                    l.debug("Looking for " + id + " found entity " + m);
                                    if (m.getVal() != 100) {
                                              l.error("**************BAD VALUE************** Found val of " + m.getVal() + " rather than 100.");
                                              sb.reportCompletion(false);
                                    }
                                    else {
                                              sb.reportCompletion(true);
                                    }
                          }
                          catch (Throwable t) {
                                    l.error("boom", t);
                          }
                }
      }
      

       

       

      package com.testpackage;
      
      import javax.jms.Connection;
      import javax.jms.ConnectionFactory;
      import javax.jms.JMSException;
      import javax.jms.Message;
      import javax.jms.MessageProducer;
      import javax.jms.Queue;
      import javax.jms.Session;
      import org.apache.log4j.Logger;
      
      public class MessageSender {
                private static Logger l = Logger.getLogger(MessageSender.class);
      
      
                public static void sendMessage(ConnectionFactory cf, Queue queue, String message) throws JMSException {
                          Message m = null;
                          Session s = null;
                          MessageProducer mp = null;
                          Connection conn = null;
                          conn = cf.createConnection();
                          try {
                                    s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                                    mp = s.createProducer(queue);
                                              m = s.createTextMessage(message);
                                    mp.send(m);
                          }
                          catch(JMSException e){
                                    throw e;
                          }
                          finally {
                                    if (mp != null) {
                                              try {
                                                        mp.close();
                                              }
                                              catch (JMSException e) {
                                                        l.error("Failed to close message producer", e);
                                              }
                                    }
                                    if (s != null) {
                                              try {
                                                        s.close();
                                              }
                                              catch (JMSException e) {
                                                        l.error("Failed to close session", e);
                                              }
                                    }
                                    try {
                                              conn.close();
                                    }
                                    catch (JMSException e) {
                                              l.error("Failed to close connection", e);
                                    }
                          }
                }
      }
      
      

       

      package com.testpackage.entity;
      
      import javax.persistence.Entity;
      import javax.persistence.GeneratedValue;
      import javax.persistence.Id;
      
      @Entity
      public class MyEntity {
                @Id
                @GeneratedValue
                private long myId;
                private long val=0;
      
                public long getMyId() {
                          return myId;
                }
                public void setMyId(long myId) {
                          this.myId = myId;
                }
                public long getVal() {
                          return val;
                }
                public void setVal(long val) {
                          this.val = val;
                }
                @Override
                public String toString() {
                          StringBuilder builder = new StringBuilder();
                          builder.append("MyEntity [myId=");
                          builder.append(myId);
                          builder.append(", val=");
                          builder.append(val);
                          builder.append("]");
                          return builder.toString();
                }
      }
      

       

       

      <?xml version="1.0" encoding="UTF-8"?>
      <persistence xmlns="http://java.sun.com/xml/ns/persistence"
                version="2.0">
                <persistence-unit name="TestPersistenceUnit">
                          <provider>org.hibernate.ejb.HibernatePersistence</provider>
                          <jta-data-source>java:jboss/datasources/MYDS</jta-data-source>
                          <class>com.testpackage.entity.MyEntity</class>
                          <exclude-unlisted-classes>true</exclude-unlisted-classes>
                          <shared-cache-mode>ENABLE_SELECTIVE</shared-cache-mode>
                          <properties>
                                    <property name="hibernate.ejb.cfgfile" value="META-INF/hibernate.cfg.xml" />
                          </properties>
                </persistence-unit>
      </persistence>
      

       

       

      <?xml version="1.0" encoding="UTF-8"?>
      <!DOCTYPE hibernate-configuration PUBLIC "-//Hibernate/Hibernate Configuration DTD 3.0//EN"
                                               "http://www.hibernate.org/dtd/hibernate-configuration-3.0.dtd">
      <hibernate-configuration>
                <session-factory name="TestPersistenceUnit">
                          <property name="hibernate.dialect">org.hibernate.dialect.Oracle10gDialect</property>
                          <property name="hibernate.generate_statistics">false</property>
                          <property name="hibernate.cache.use_second_level_cache">true</property>
                          <property name="hibernate.hbm2ddl.auto">create</property>
                          <property name="hibernate.show_sql">false</property>
                          <property name="hibernate.cache.region.factory_class">org.jboss.as.jpa.hibernate4.infinispan.InfinispanRegionFactory</property>
                          <property name="hibernate.cache.infinispan.cachemanager">java:jboss/infinispan/container/hibernate</property>
                          <property name="hibernate.transaction.manager_lookup_class">org.hibernate.transaction.JBossTransactionManagerLookup</property>
                          <property name="hibernate.transaction.factory_class">org.hibernate.transaction.CMTTransactionFactory</property>
                          <class-cache class="com.testpackage.entity.MyEntity"
                                    include="all" usage="transactional" />
                </session-factory>
      </hibernate-configuration>
      

       

      I'm finding that about 25% of the time, the em.find in Bmdb retrieves a value that does not contain the correct value:

       

      14:35:17,664 DEBUG [com.testpackage.StartupBean] (EJB default - 85) --------------------- Tallies - 630 calls, 150 failures
      14:35:17,667 DEBUG [com.testpackage.Launcher] (EJB AsyncPool - 446) Persisted MyEntity [myId=635, val=0] calling Launcher.
      ...
      
      14:35:19,755 DEBUG [com.testpackage.Amdb] (Thread-139 (HornetQ-client-global-threads-432052606)) Looking for 640 found entity MyEntity [myId=640, val=0]
      ...
      14:35:21,773 DEBUG [com.testpackage.Bmdb] (Thread-34 (HornetQ-client-global-threads-432052606)) Looking for 638 found entity MyEntity [myId=638, val=100]
      14:35:21,773 DEBUG [com.testpackage.Bmdb] (Thread-115 (HornetQ-client-global-threads-432052606)) Looking for 639 found entity MyEntity [myId=639, val=100]
      14:35:21,774 DEBUG [com.testpackage.Bmdb] (Thread-134 (HornetQ-client-global-threads-432052606)) Looking for 633 found entity MyEntity [myId=633, val=0]
      14:35:21,774 ERROR [com.testpackage.Bmdb] (Thread-134 (HornetQ-client-global-threads-432052606)) **************BAD VALUE************** Found val of 0 rather than 100.
      14:35:21,774 DEBUG [com.testpackage.Bmdb] (Thread-80 (HornetQ-client-global-threads-432052606)) Looking for 634 found entity MyEntity [myId=634, val=100]
      14:35:21,774 DEBUG [com.testpackage.Bmdb] (Thread-104 (HornetQ-client-global-threads-432052606)) Looking for 636 found entity MyEntity [myId=636, val=0]
      14:35:21,775 ERROR [com.testpackage.Bmdb] (Thread-104 (HornetQ-client-global-threads-432052606)) **************BAD VALUE************** Found val of 0 rather than 100.
      14:35:21,775 DEBUG [com.testpackage.Bmdb] (Thread-138 (HornetQ-client-global-threads-432052606)) Looking for 637 found entity MyEntity [myId=637, val=100]
      14:35:21,775 DEBUG [com.testpackage.Bmdb] (Thread-140 (HornetQ-client-global-threads-432052606)) Looking for 632 found entity MyEntity [myId=632, val=100]
      14:35:21,776 DEBUG [com.testpackage.Bmdb] (Thread-139 (HornetQ-client-global-threads-432052606)) Looking for 635 found entity MyEntity [myId=635, val=100]
      14:35:21,776 DEBUG [com.testpackage.Bmdb] (Thread-115 (HornetQ-client-global-threads-432052606)) Looking for 631 found entity MyEntity [myId=631, val=100]
      
      

       

      Another unusual thing I've noticed is that sometimes it seems to be working correctly...that is, I redeploy the app and it may not see the problem.  If it doesn't have the problem...it has the correct value EVERY single time.

       

      14:23:35,085 DEBUG [com.testpackage.StartupBean] (EJB default - 16) --------------------- Tallies - 4830 calls, 0 failures
      

       

      But if I redeploy the same app it won't work the next time.  I've tried numerous different configurations for hibernate properties that I've found in my quest....but they either exhibit the same behavior or just don't work at all.  I may have missed something in the documentation that I've been going through, but everything I've tried doesn't work consistently.  Perhaps I've made some assumptions that are inaccurate.

       

      I know I've provided a lot of detail here...just wanted to provide everything you might need.  Please have a look at this and let me know if you can see what's causing my problem...I've been smashing my head on the keyboard for some time.

       

      Thanks.

       

      Message was edited by: seacuke23. Added MessageSender.

        • 1. Re: Transactions for Infinispan as 2LC for Hibernate
          seacuke23

          TMI?

           

          Additional details:

           

          My JBoss install seems to be running "infinispan 5.1.7.Final" and is running as a single node with standalone-full.xml.

          • 2. Re: Transactions for Infinispan as 2LC for Hibernate
            shadowcreeper

            You may need to run with standalone-full-ha.xml to get the invalidator to work.

             

            And you might need to mark your entity as @javax.persistence.Cacheable and any @NamedQuery you have on the entity might need the @QueryHint(name="org.hibernate.cacheable",value="true") if you run them often and want to cache them.

            • 3. Re: Transactions for Infinispan as 2LC for Hibernate
              seacuke23

              Thanks for your response Ralph.

               

              I have tried using standalond-full-ha and it produced the same results. I have also applied the Cacheable annotation to the entity and it didn't change the behavior.  I don't use any NamedQueries so that is not a factor. 

               

              One thing I will add is that if I set hibernate.cache.use_second_level_cache the problem doesn't occur...but then of course it's not using the cache.

               

              I have also tried not including hibernate.cache.infinispan.cachemanager and hibernate.cache.region.factory_class as specified here:

               

              https://docs.jboss.org/author/display/AS71/JPA+Reference+Guide#JPAReferenceGuide-UsingtheInfinispansecondlevelcache

               

              and I encounter the same problem.

               

              One thing I have noticed is that some pages say to specify:

               

              <property name="hibernate.transaction.manager_lookup_class">org.hibernate.transaction.JBossTransactionManagerLookup</property>

              <property name="hibernate.transaction.factory_class">org.hibernate.transaction.CMTTransactionFactory</property>

               

              Which I have done and it didn't seem to make a difference...but it also doesn't seemt to mind if I include nonsense values for them...so obviously it's not using them at all.

               

              <property name="hibernate.transaction.manager_lookup_class">org.hibernate.transaction.JBossTransactionManagerLookupblah</property>

              <property name="hibernate.transaction.factory_class">org.hibernate.transaction.CMTTransactionFactoryblah</property>

               

              Any other thoughts?

              • 4. Re: Transactions for Infinispan as 2LC for Hibernate
                shadowcreeper

                I have tested this to ensure that clustered servers cache objects and invalidate them correctly, only hitting the database when necessary (or when a pessimistic invalidation made it necessary).

                 

                Note: My test also had a RESTful resource and some stuff that yours may not (hence the apparently unneeded dependencies in the deployment file).

                 

                Here is my entity class:

                package com.test.entity;
                
                import org.jetbrains.annotations.NotNull;
                
                import javax.persistence.*;
                import java.io.Serializable;
                
                @Entity
                @Table(name = "test")
                @NamedQueries({
                   @NamedQuery(
                      name = "TestEntity.allEntries",
                      query = "SELECT t FROM TestEntity t ORDER BY t.m_id ASC",
                      hints = { @QueryHint(name = "org.hibernate.cacheable", value = "true") } )
                })
                @Cacheable
                public class TestEntity
                   implements Serializable
                {
                    private final static long serialVersionUID = 1;
                
                   @Id
                   @Column(name = "test_id")
                   private Long m_id;
                
                   @Basic
                   @NotNull
                   @Column(name = "test_text", nullable = false)
                   private String m_text;
                
                   public Long getId ()
                   {
                      return m_id;
                   }
                
                   public void setId ( Long id )
                   {
                      m_id = id;
                   }
                
                   @NotNull
                   public String getText ()
                   {
                      return m_text;
                   }
                
                   public void setText ( @NotNull String text )
                   {
                      m_text = text;
                   }
                
                   @Override
                   public String toString ()
                   {
                      return "[" + m_id + ":" + m_text + "]";
                   }
                }
                

                 

                Here is my entity manager class:

                 

                package com.test.ejb;
                
                import com.test.Test;
                import com.test.entity.TestEntity;
                import org.jboss.ejb3.annotation.Clustered;
                
                import javax.ejb.Local;
                import javax.ejb.Stateless;
                import javax.persistence.EntityManager;
                import javax.persistence.PersistenceContext;
                import java.util.Collection;
                import java.util.List;
                
                @Stateless
                @Local(Test.class)
                @Clustered // this allows the EJB to load balance and failover nicely
                public class TestEJB
                   implements Test
                {
                
                   @PersistenceContext(unitName = "ClusterTestPU")
                   private EntityManager m_entityManager;
                
                   @Override
                   public Collection<TestEntity> getTests ()
                   {
                      //noinspection unchecked
                      return (List<TestEntity>)m_entityManager
                         .createNamedQuery( "TestEntity.allEntries" )
                         .getResultList();
                   }
                
                   @Override
                   public TestEntity getTest ( long id )
                   {
                      return m_entityManager.find( TestEntity.class, id );
                   }
                
                   @Override
                   public TestEntity createTest ( long id, String text )
                   {
                      final TestEntity testEntity = new TestEntity();
                      testEntity.setId( id );
                      testEntity.setText( text );
                      m_entityManager.persist( testEntity );
                      return testEntity;
                   }
                
                   @Override
                   public TestEntity updateTest ( long id, String text )
                   {
                      final TestEntity testEntity = m_entityManager.find( TestEntity.class, id );
                      testEntity.setText( text );
                      m_entityManager.merge( testEntity );
                      return testEntity;
                   }
                
                   @Override
                   public void removeTest ( long id )
                   {
                      final TestEntity testEntity = m_entityManager.find( TestEntity.class, id );
                      m_entityManager.remove( testEntity );
                   }
                }
                

                 

                Here is my persistence.xml file:

                <?xml version="1.0" encoding="UTF-8"?>

                <persistence xmlns="http://java.sun.com/xml/ns/persistence"

                             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

                             xsi:schemaLocation="http://java.sun.com/xml/ns/persistence

                                                 http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"

                             version="2.0">

                   <persistence-unit name="ClusterTestPU" transaction-type="JTA">

                      <jta-data-source>java:/jboss/datasources/TestEntityDS</jta-data-source>

                      <class>com.test.entity.TestEntity</class>

                 

                      <shared-cache-mode>ENABLE_SELECTIVE</shared-cache-mode>

                      <properties>

                         <!-- NOTE: For query caching to work, all machines in the cluster must have system clocks in sync -->

                         <property name="hibernate.cache.use_query_cache" value="true"/>

                         <property name="hibernate.cache.use_second_level_cache" value="true"/>

                         <property name="hibernate.cache.infinispan.statistics" value="true"/>

                         <property name="hibernate.cache.infinispan.cachemanager" value="java:jboss/infinispan/container/hibernate"/>

                         <property name="hibernate.cache.region.factory_class" value="org.hibernate.cache.infinispan.JndiInfinispanRegionFactory"/>

                         <property name="hibernate.transaction.manager_lookup_class" value="org.hibernate.transaction.JBossTransactionManagerLookup"/>

                 

                         <!-- These lines can be used for debugging -->

                         <property name="hibernate.show_sql" value="true"/>

                         <property name="hibernate.format_sql" value="true"/>

                         <property name="hibernate.use_sql_comments" value="true"/>

                         <property name="hibernate.generate_statistics" value="true"/>

                      </properties>

                   </persistence-unit>

                </persistence>

                 

                Here is my jboss-deployment-structure.xml file:

                <jboss-deployment-structure>
                   <sub-deployment name="clusterable-test-war.war">
                      <dependencies>
                         <module name="org.hibernate"/>
                         <module name="org.infinispan"/>
                         <module name="org.jboss.netty"/>
                         <module name="org.jboss.resteasy.resteasy-jackson-provider"/>
                      </dependencies>
                   </sub-deployment>
                </jboss-deployment-structure>
                
                • 5. Re: Transactions for Infinispan as 2LC for Hibernate
                  shadowcreeper

                  [...]

                  One thing I will add is that if I set hibernate.cache.use_second_level_cache the problem doesn't occur...but then of course it's not using the cache.

                  [...]

                  https://docs.jboss.org/author/display/AS71/JPA+Reference+Guide#JPAReferenceGuide-UsingtheInfinispansecondlevelcache

                  [...]

                  I assume you meant when you set "use_second_level_cache" to false? (as it must be set to true for this to work)

                  In order to use infinispan as hibernate second level cache, you need to be using a second level cache (as is mentioned at the top of the link you posted)...

                   

                  Also, for the invalidation to work correctly I have noticed that the server clocks must be in sync (being a minute off will break everything, possibly even just a few seconds off).

                  • 6. Re: Transactions for Infinispan as 2LC for Hibernate
                    seacuke23

                    Thanks again for your time Ralph.

                     

                    Ralph Jennings wrote:


                    I assume you meant when you set "use_second_level_cache" to false?

                     

                    That is what I meant...just forgot to complete the thought.  And my point with saying that is that it's not a logic problem with the app but it's actually being introduced by using caching.

                     

                    Also, I'm not working in a clustered environment right now...just trying to get this to work on a single-node standalone. 

                     

                    My problem is not that it isn't caching.  The problem is rather that if Infinispan is taking part in a transaction, it's not the same transaction as the entity modification and JMS write in Amdb.  I need to ensure that when Bmdb receives a message, that the result of the transaction that put the message on the queue is reflected in the cache.  So the problem is a timing problem.  When Amdb modifies the val to 100 and writes the ID to the queue transactionally, the entity read by Bmdb MUST have a val of 100 which occasionally is not the case when I'm using Infinispan caching.

                     

                    When I put a 100ms thread sleep before the em.find in Bmdb I do get an entity from the cache with the val of 100 every time.