1 Reply Latest reply on Sep 9, 2008 7:50 AM by timfox

    QueueImpl::deliverAsync

    clebert.suconic

      Looking at deliverAsyn, there is an AtomicBoolean that should be used to only allow one thread being executed at any time. (It should give up the execution on any other thread).

      public void deliverAsync(final Executor executor)
       {
       //Prevent too many executors running at once
      
       if (waitingToDeliver.compareAndSet(false, true))
       {
       executor.execute(deliverRunner);
       }
       }
      


      DeliverRunning is then setting the boolean to false before even starting the deliver.

      private class DeliverRunner implements Runnable
       {
       public void run()
       {
       //Must be set to false *before* executing to avoid race
       waitingToDeliver.set(false);
      
       deliver();
       }
       }
      



      Shouldn't waitingToDeliver boolean be set to false *after* the deliver was called?

      private class DeliverRunner implements Runnable
       {
       public void run()
       {
       try
       {
       deliver();
       }
       finally
       {
       waitingToDeliver.set(false);
       }
       }
       }
      


      I simulated this on a simple test, and I would aways have multiple threads executing my method if set was called before.
      
      
      public class MultiStartTest
      {
       static AtomicBoolean waitingControl = new AtomicBoolean(false);
      
       static AtomicInteger counter = new AtomicInteger(0);
      
       static final int NUMBER_OF_THREADS = 100;
      
       static CountDownLatch latchAlign = new CountDownLatch(NUMBER_OF_THREADS);
      
       static CountDownLatch latchStart = new CountDownLatch(1);
      
       static CountDownLatch latchWait = new CountDownLatch(1);
      
      
      
       public static void execute() throws Exception
       {
       if (waitingControl.compareAndSet(false, true))
       {
      
       // waitingControl.set(false); If you re-enable this line, multiple threads will be executing the critical area
      
       if (counter.addAndGet(1)>1)
       {
       System.out.println("Multiple threads are executing this critical area ( counter = " + counter.get() + ")");
       }
      
       latchWait.await(); // Simulating some work being done.
      
       counter.addAndGet(-1);
       waitingControl.set(false);
       }
      
       }
      
      
       static class TRun extends Thread
       {
       public void run()
       {
       try
       {
       latchAlign.countDown();
       latchStart.await();
      
       MultiStartTest.execute();
       }
       catch (Throwable e)
       {
       e.printStackTrace();
       }
       }
       }
      
      
       public static void main(String arg[])
       {
       try
       {
       ArrayList<TRun> threads = new ArrayList<TRun>();
       for (int i = 0; i < NUMBER_OF_THREADS; i++)
       {
       TRun t = new TRun();
       t.start();
       threads.add(t);
       }
      
       latchAlign.await();
       latchStart.countDown();
      
       latchWait.countDown();
      
       for (Thread t: threads)
       {
       t.join();
       }
      
      
       }
       catch (Exception e)
       {
       e.printStackTrace();
       }
       }
      
      }
      


        • 1. Re: QueueImpl::deliverAsync
          timfox

          If you set the flag to false after calling deliver, then there's a possibility deliver will never get called, and messages not delivered.

          Thread 1 calls deliverasync
          Delivery thread delivers message
          Thread 2 adds message to queue
          Thread2 calls deliverasync
          Already delivering so thread 2 call to deliver is ignored
          Delivery thread sets flag to false
          ==> Message gets "stuck" in queue.