0 Replies Latest reply on Jul 28, 2009 9:05 AM by adinn

    Case study : using helper built-ins to synchronize on thread

    adinn

      I have been trying to develop a test where I can check that all threads created by a listener service do actually exit at shutdown. One of the things which has made it difficult to get this right is the behaviour of my listener thread which accepts connections on a socket. At shutdown the listener knows that any threads it as created are in the process of exiting but it does not (need to) join them before returning from the shutdown. My test wants to be sure they really have exited before counting how many threads are live. So I developed a byteman helper to track the spawned threads and used it t make the listener join them before returning from the shutdown call.

      The listener creates a server socket and spawns a connection handler thread each time accept returns a connection socket. It then loops back into its accept call waiting for another connection request. The loop test checks a shutdown flag but this is only tested at the top of the loop The listener mostly just sits on an i/o wait under the call to accept until an external client tries to connect.

       public void run()
       {
       while ( !stopRequested() ) {
       try {
       final Socket conn = _listener_socket.accept();
       // n.b. add may not occur because a shutdown was requested
       if (addConnection(conn)) {
       // ok the connection is in the list -- ensure it clears itself out
       Connection.Callback callback = new Connection.Callback() {
       private Socket _conn = conn;
       public void run() {
       removeConnection(_conn);
       }
       };
      
       Connection new_conn = new Connection( conn, _listener_service, callback );
      
       new_conn.start();
       }
       } catch ( final InterruptedIOException ex ) {
       // timeout on the listener socket expired.
       } catch (final SocketException ex) {
       // we get this if the socket is closed under a call to shutdown
       } catch (final Exception ex) {
       // oops error
       }
       }
       }
      


      A call to stopListener from another thread sets the stopRequested flag and closes the accept socket, waking the listener thread with a socket exception. This avoids the need to interrupt the listener.

      The stopListener calling thread joins the listener thread after setting the stopRequested flag. This is done to ensure that the listener has completed. For this to guarantee a full shutdown the caller also needs to make sure that all active connections are closed and all connection handler threads have exited.

      So, to enable this the listener adds connection sockets to a list as they are created. Sockets are removed from the list by a callback called by the owning connection handler thread as it exits. This means that before the call to stopListener connection threads will purge the list as the clients at the other end closes down their connections. The advantage of this is that there is no need to wake the accept thread (this would require an interrupt) in order for it to join the exiting thread.

      After the caller of stopListener wakes the listener thread it goes on to close each socket in the active list and then waits for the list to empty.

       public synchronized void stopListener()
       {
       _stop_listener = true;
      
       try {
       _listener_socket.close(); // in case we're still in accept
       } catch (final Exception ex) {
       }
       while(connections.size() > 0) {
       Socket conn = connections.get(0);
       try {
       conn.close();
       } catch (Exception e) {
       }
       try {
       wait();
       } catch (InterruptedException e) {
       }
       }
       // make sure this listener thread has exited before we return
       try {
       this.join();
       } catch (InterruptedException ie) {
       }
       }
      


      This has the same effect as if all the connections at the other end had closed their connection. Each connection handler detects the close and removes its socket from the list.

      When the stopListener calliung thread finds that the list is empty it can be sure that no connections are active and that all connection threads are about to exit. However, it cannot be sure that they have actually exited.

      This is a problem for my test because I want to check the thread pool after calling stopListener and be sure that the only thread still running is my main test thread. On a heavily loaded system the connection threads may still be running after stopListener returns. It is even possible that a thread which removed its socket before stopListener was called could still be running. Note that this is not a
      problem as far as the application is concerned. Once stopListener has completed the threads are guaranteed to exit without doing any more work so effectively they have stopped.

      This issue could be resolved by making the listener join every connection thread after it has closed. For example each time a connection handler exits it could move its socket to a closes list and interrupt the listener. .However, this would incur unnecessary work on the part of the application just to satisfy the needs of the test.

      So, I found a solution by introducing a new byteman built-in which inserts the join behaviour where it is needed by the test. I added four new primitives:

      boolean createJoin(Object key, int expected)
      boolean isJoin(Object key, int expected)
      boolean joinEnlist(Object key)
      boolean joinWait(Object key, int expected)
      


      createJoin sets up a Joiner object identified by the supplied key which can be used to join a specific number of threads. It returns false if a Joiner with the given key exists otherwise it creates the Joiner and returns true.

      isJoin tests whether a Joiner identified by key with a given expected thread count exists has been created and is still active

      joinEnlist is called from any thread which needs to be joined. If all is well adds the current thread to a list associated with the Joiner identified by key and returns true, allowing the thread, presumably, to continue towards its exit. It will fail to add the thread to the list and return false if ii) there is no Joiner identified by the key
      ii) the thread is already in the Joiner's list or ii) the list already contains the expected number of threads.

      joinWait is called from a thread which wants to join the threads enlisted with a Joiner. If key does not identify a Joiner or the expected count for the Joiner differs it returns false. If another thread has called joinWait with the same key and is still waiting it al;so returns false. Otherwise the Joiner is tagged as belonging to the calling thread and, if necessary, the thread waist until the enlisted count reaches the expected count. At this point the Joiner is dissociated from the key and the calling thread joins each thread in the list. The call returns true once all the join operations have completed.

      So, in my test I now use the following rules to ensure that the listener joins its connection handler threads
      RULE RecoveryManagerStartStopTest create joiner
      CLASS com.hp.mwtests.ts.arjuna.recovery.RecoveryManagerStartStopTest
      METHOD testStartStop()
      HELPER com.hp.mwtests.ts.arjuna.recovery.RecoveryManagerStartStopTest$JoinHelper
      AT ENTRY
      BIND NOTHING
      IF TRUE
      DO debug("create joiner for listener"),
       createJoin("Listener Connection", 2)
      ENDRULE
      
      RULE listener join wait
      CLASS com.arjuna.ats.internal.arjuna.recovery.Listener
      METHOD run()
      HELPER com.hp.mwtests.ts.arjuna.recovery.RecoveryManagerStartStopTest$JoinHelper
      AT RETURN
      BIND NOTHING
      IF isJoin("Listener Connection", 2)
      DO debug("Listener Connection calling joinWait 2 " + Thread.currentThread()),
       joinWait("Listener Connection", 2),
       debug("Listener Connection called joinWait 2 " + Thread.currentThread())
      ENDRULE
      
      RULE connection join enlist
      CLASS com.arjuna.ats.internal.arjuna.recovery.Listener
      METHOD removeConnection
      HELPER com.hp.mwtests.ts.arjuna.recovery.RecoveryManagerStartStopTest$JoinHelper
      AT RETURN
      BIND NOTHING
      IF joinEnlist("Listener Connection")
      DO debug("Listener Connection joinEnlist " + Thread.currentThread())
      ENDRULE
      


      The test creates 2 connections which lie idle until stopListener is called. They both receive an exception and prepare to exit, calling method remove_connection in the exit callback. The rule attached to this method calls joinEnlist, inserting the connection handler thread into the join list. The rule returns and the thread continues towards its exit.

      When the listener thread returns from its run method the other rule is triggered and it calls joinWait. It cannot return from this call until the connection threads have exited. So the shutdown call will not return from its join on the listener thread until all 3 threads have exited.

      Here's the actual test output:

       [junit] Running com.hp.mwtests.ts.arjuna.recovery.RecoveryManagerStartStopTest
       [junit] rule.debug{RecoveryManagerStartStopTest create rendezvous} : create rendezvous for PR listener run
       [junit] Before recovery manager create
       [junit] Thread count == 1
       [junit] Thread[0] == main
       [junit] org.jboss.byteman.agent.Transformer : Saving transformed bytes to dump/com/arjuna/ats/internal/arjuna/recovery/PeriodicRecovery.class
       [junit] org.jboss.byteman.agent.Transformer : Saving transformed bytes to dump/com/arjuna/ats/internal/arjuna/recovery/Listener.class
       [junit] rule.debug{periodic recovery set up listener for rendezvous} : flagging PR listener
       [junit] Before recovery manager initialize
       [junit] Thread count == 2
       [junit] Thread[0] == main
       [junit] Thread[1] == Listener:4712
       [junit] Before recovery manager start periodic recovery thread
       [junit] Thread count == 2
       [junit] Thread[0] == main
       [junit] Thread[1] == Listener:4712
       [junit] rule.debug{listener rendezvous at run} : listener rendezvous at PR listener run
       [junit] Before recovery manager client create
       [junit] Thread count == 3
       [junit] Thread[0] == main
       [junit] Thread[1] == Listener:4712
       [junit] Thread[2] == Periodic Recovery
       [junit] rule.debug{RecoveryManagerStartStopTest rendezvous before adding clients} : RecoveryManagerStartStopTest rendezvous at PR listener run
       [junit] client atempting to connect to host toby port 4712
       [junit] org.jboss.byteman.agent.Transformer : Saving transformed bytes to dump/com/arjuna/ats/internal/arjuna/recovery/Connection.class
       [junit] rule.debug{RecoveryManagerStartStopTest connection rendezvous at run} : connection rendezvous at doWork
       [junit] connected!!!
       [junit] client atempting to connect to host toby port 4712
       [junit] connected!!!
       [junit] rule.debug{RecoveryManagerStartStopTest rendezvous after client connections started} : RecoveryManagerStartStopTest rendezvous after 2nd addRecoveryClient
       [junit] rule.debug{RecoveryManagerStartStopTest connection rendezvous at run} : connection rendezvous at doWork
       [junit] Before recovery manager terminate
       [junit] Thread count == 7
       [junit] Thread[0] == main
       [junit] Thread[1] == Listener:4712
       [junit] Thread[2] == Periodic Recovery
       [junit] Thread[3] == Recovery Listener Client
       [junit] Thread[4] == Server.Connection:127.0.0.1:50176
       [junit] Thread[5] == Recovery Listener Client
       [junit] Thread[6] == Server.Connection:127.0.0.1:50177
       [junit] 2009-07-28 11:22:50,617 [Server.Connection:127.0.0.1:50176] WARN com.arjuna.ats.arjuna.logging.arjLoggerI18N - [com.arjuna.ats.internal.arjuna.recovery.WorkerService_2] - IOException
       [junit] Recovery Listener Client got empty string from readline() as expected
       [junit] rule.debug{connection join enlist} : Listener Connection joinEnlist Thread[Server.Connection:127.0.0.1:50176,5,main]
       [junit] 2009-07-28 11:22:50,621 [Server.Connection:127.0.0.1:50177] WARN com.arjuna.ats.arjuna.logging.arjLoggerI18N - [com.arjuna.ats.internal.arjuna.recovery.WorkerService_2] - IOException
       [junit] rule.debug{connection join enlist} : Listener Connection joinEnlist Thread[Server.Connection:127.0.0.1:50177,5,main]
       [junit] Recovery Listener Client got empty string from readline() as expected
       [junit] rule.debug{listener join wait} : Listener Connection calling joinWait 2 Thread[Listener:4712,5,main]
       [junit] rule.debug{listener join wait} : Listener Connection called joinWait 2 Thread[Listener:4712,5,main]
       [junit] After recovery manager terminate
       [junit] Thread count == 1
       [junit] Thread[0] == main
       [junit] Tests run: 1, Failures: 0, Errors: 0, Time elapsed: 1.024 sec
      


      I'm intending to add these built-ins to the default helper class so they are available by default to all applications. For now the implementation can be found in class RecoveryManagerStartStopTest in the JBoss repository:http://anonsvn.jboss.org/repos/labs/labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/recovery/RecoveryManagerStartStopTest.java Look for inner class JoinHelper and its auxiliary class Joiner.