12 Replies Latest reply on Mar 13, 2009 11:59 AM by clebert.suconic

    JOIN not working..

    clebert.suconic

      Unless I made some mistake on the following testCase, there is some problem on JOIN (which I will be investigating).

      But if you guys could please take a look on this testcase... just to make sure I'm not missing anything obvious:


       public void testSimpleJoin() throws Exception
       {
       SimpleString ADDRESS1 = new SimpleString("Address-1");
       SimpleString ADDRESS2 = new SimpleString("Address-2");
      
       clientSession.createQueue(ADDRESS1, ADDRESS1, true);
       clientSession.createQueue(ADDRESS2, ADDRESS2, true);
      
       Xid xid = newXID();
       ClientSession sessionA = sessionFactory.createSession(true, false, false);
       sessionA.start(xid, XAResource.TMNOFLAGS);
      
       ClientSession sessionB = sessionFactory.createSession(true, false, false);
       sessionB.start(xid, XAResource.TMJOIN);
      
      
       ClientProducer prodA = sessionA.createProducer(ADDRESS1);
       ClientProducer prodB = sessionB.createProducer(ADDRESS2);
      
       prodA.send(createTextMessage(sessionA, "A"));
       prodB.send(createTextMessage(sessionB, "B"));
      
       sessionA.end(xid, XAResource.TMSUCCESS);
       sessionB.end(xid, XAResource.TMSUCCESS);
      
       // Since sessionA and B are from the same server, we could close B and commit the XID through sessionA, as B was JOINED
       sessionB.close();
      
       sessionA.commit(xid, true);
      
       sessionA.close();
      
      
       xid = newXID();
      
       clientSession.start(xid, XAResource.TMNOFLAGS);
      
       ClientConsumer cons1 = clientSession.createConsumer(ADDRESS1);
       ClientConsumer cons2 = clientSession.createConsumer(ADDRESS2);
      
       clientSession.start();
      
       ClientMessage msg = cons1.receive(1000);
       assertNotNull(msg);
       msg.acknowledge();
      
       assertNull(cons1.receiveImmediate());
      
      
       msg = cons2.receive(1000);
       assertNotNull(msg);
       msg.acknowledge();
      
      
       assertNull(cons2.receiveImmediate());
      
       clientSession.end(xid, XAResource.TMSUCCESS);
      
       clientSession.commit(xid, true);
      
       clientSession.close();
       }
      


      And the same test, but using a real TransactionManager instead:

       public void testSimpleJoinWithTM() throws Exception
       {
      
       SimpleString ADDRESS1 = new SimpleString("Address-1");
       SimpleString ADDRESS2 = new SimpleString("Address-2");
      
       clientSession.createQueue(ADDRESS1, ADDRESS1, true);
       clientSession.createQueue(ADDRESS2, ADDRESS2, true);
      
       TransactionManager tm = new TransactionManagerImple();
       tm.begin();
      
       Transaction tx = tm.getTransaction();
      
       ClientSession sessionA = sessionFactory.createSession(true, false, false);
      
       tx.enlistResource(sessionA);
      
       ClientSession sessionB = sessionFactory.createSession(true, false, false);
      
       tx.enlistResource(sessionB);
      
       ClientProducer prodA = sessionA.createProducer(ADDRESS1);
       ClientProducer prodB = sessionB.createProducer(ADDRESS2);
      
       prodA.send(createTextMessage(sessionA, "A"));
       prodB.send(createTextMessage(sessionB, "B"));
      
       tx.delistResource(sessionA, XAResource.TMSUCCESS);
       tx.delistResource(sessionB, XAResource.TMSUCCESS);
      
       tm.commit();
      
       sessionB.close();
       sessionA.close();
      
       tm.begin();
      
       tx = tm.getTransaction();
       tx.enlistResource(clientSession);
      
       ClientConsumer cons1 = clientSession.createConsumer(ADDRESS1);
       ClientConsumer cons2 = clientSession.createConsumer(ADDRESS2);
      
       clientSession.start();
      
       ClientMessage msg = cons1.receive(1000);
       assertNotNull(msg);
       msg.acknowledge();
      
       assertNull(cons1.receiveImmediate());
      
       msg = cons2.receive(1000);
       assertNotNull(msg);
       msg.acknowledge();
      
      
       assertNull(cons2.receiveImmediate());
      
       tx.delistResource(clientSession, XAResource.TMSUCCESS);
      
       tm.commit();
      
       clientSession.close();
       }
      
      


        • 1. Re: JOIN not working..
          clebert.suconic

          I forgot to tell what's the failure.

          The messages are not being delivered.
          The following assertion will fail:

           ClientMessage msg = cons1.receive(1000);
           assertNotNull(msg);
           msg.acknowledge();
          



          Also...

          if both messages are sent using only sessionA or only sessionB, the test will pass.

          So, if I make this change. the test will pass:

           // Both prodA and proB using sessionB
           ClientProducer prodA = sessionB.createProducer(ADDRESS1);
           ClientProducer prodB = sessionB.createProducer(ADDRESS2);
          



          • 2. Re: JOIN not working..
            clebert.suconic

            The problem I see is here:

             prodA.send(createTextMessage(sessionA, "A"));
             prodB.send(createTextMessage(sessionB, "B"));
            


            prodA and prodB will both send Async, using two distinct ClientSessions & RemoteConnections.


            TransactionImpl is not synchronizing some of its collections, as the whole thing was not really planned for concurrency, what was an exception when playing with Joining as the test was written.... creating some race conditions.

            Those tests will aways fail in the first time (say.. if you make a loop running them), and pass at the second time. (probably because of ClassLoading slowing down things creating a race condition).




            • 3. Re: JOIN not working..
              clebert.suconic

              Synchronizing QueueImpl::getRefsOperation (per TX) would fix the test:


              final RefsOperation getRefsOperation(final Transaction tx)
               {
              - RefsOperation oper = (RefsOperation)tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
              -
              - if (oper == null)
              + synchronized (tx)
               {
              - oper = new RefsOperation();
              -
              - tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
              -
              - tx.addOperation(oper);
              + RefsOperation oper = (RefsOperation)tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
              +
              + if (oper == null)
              + {
              + oper = new RefsOperation();
              +
              + tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
              +
              + tx.addOperation(oper);
              + }
              +
              + return oper;
               }
              -
              - return oper;
               }
              


              • 4. Re: JOIN not working..
                clebert.suconic

                getPageOperation would also be the same case.

                Tim: I will commit this on my morning (your afternoon) only after talking to you as you may have another suggestion.

                • 5. Re: JOIN not working..
                  timfox

                  Can you explain what the race condition is? Not so interested in the solution at this point.

                  • 6. Re: JOIN not working..
                    timfox

                    I spoke to Jonathan about this and it seems your use case may be legal although it is very unusual.

                    In most cases, the xid will end it's association with one resource before joining on the next resource. If you change your test to do the "normal" thing, it passes.

                    Are you actually seeing this behaviour with a transaction manager?

                    The XA spec says it permissable to block the second start call until the first has finished, so I suggest we do that. I don't want to go down the path of having to make transactions mult-threaded on the server.

                    • 7. Re: JOIN not working..
                      timfox

                      XA spec, section 3.3:


                      RMs in the DTP environment should anticipate that many threads will try to use
                      them concurrently. If multiple threads use an RM on behalf of the same XID, the
                      RM is free to serialise the threads’ work in any way it sees ï¬�t. For example, an RM
                      may block a second or subsequent thread while one is active.



                      • 8. Re: JOIN not working..
                        clebert.suconic

                         

                        "timfox" wrote:
                        Can you explain what the race condition is? Not so interested in the solution at this point.



                        My Post at Wed Mar 11, 2009 20:34 PM was about what' s the race. I make two calls using two different sessions, and the same XID will be used at the same time.



                        • 9. Re: JOIN not working..
                          clebert.suconic

                           

                          "timfox" wrote:


                          Are you actually seeing this behaviour with a transaction manager?



                          Yes.. the same problem appear when using a transaction manager.

                          • 10. Re: JOIN not working..
                            timfox

                             

                            "clebert.suconic@jboss.com" wrote:
                            "timfox" wrote:
                            Can you explain what the race condition is? Not so interested in the solution at this point.



                            My Post at Wed Mar 11, 2009 20:34 PM was about what' s the race. I make two calls using two different sessions, and the same XID will be used at the same time.



                            That's not an explanation of the race, that's an explanation of how to reproduce the race. It doesn't explain how the race occurs.

                            An explanation of the race would be something like:

                            Thread a attempts to get the transaction concurrently with thread b, the is null check evaluates to false.... etc etc

                            Anyhow, I can see why it occurs by looking at the code so no need to explain anything now.



                            • 11. Re: JOIN not working..
                              clebert.suconic

                               

                              "timfox" wrote:
                              XA spec, section 3.3:


                              RMs in the DTP environment should anticipate that many threads will try to use
                              them concurrently. If multiple threads use an RM on behalf of the same XID, the
                              RM is free to serialise the threads’ work in any way it sees ï¬Â�t. For example, an RM
                              may block a second or subsequent thread while one is active.



                              I translate that as we should synchronize the calls, protecting it for multi-threading while using the same XID... correct?

                              • 12. Re: JOIN not working..
                                clebert.suconic

                                 

                                RMs in the DTP environment should anticipate that many threads will try to use
                                them concurrently.


                                I have added some synchronization based on this statement, and I have committed the test. We can change it If we decide to change the test to do the "normal case" and not support that scenario.