QueueImpl::deliverAsync
clebert.suconic Sep 8, 2008 2:09 PMLooking at deliverAsyn, there is an AtomicBoolean that should be used to only allow one thread being executed at any time. (It should give up the execution on any other thread).
public void deliverAsync(final Executor executor) { //Prevent too many executors running at once if (waitingToDeliver.compareAndSet(false, true)) { executor.execute(deliverRunner); } }
DeliverRunning is then setting the boolean to false before even starting the deliver.
private class DeliverRunner implements Runnable { public void run() { //Must be set to false *before* executing to avoid race waitingToDeliver.set(false); deliver(); } }
Shouldn't waitingToDeliver boolean be set to false *after* the deliver was called?
private class DeliverRunner implements Runnable { public void run() { try { deliver(); } finally { waitingToDeliver.set(false); } } }
I simulated this on a simple test, and I would aways have multiple threads executing my method if set was called before.
public class MultiStartTest { static AtomicBoolean waitingControl = new AtomicBoolean(false); static AtomicInteger counter = new AtomicInteger(0); static final int NUMBER_OF_THREADS = 100; static CountDownLatch latchAlign = new CountDownLatch(NUMBER_OF_THREADS); static CountDownLatch latchStart = new CountDownLatch(1); static CountDownLatch latchWait = new CountDownLatch(1); public static void execute() throws Exception { if (waitingControl.compareAndSet(false, true)) { // waitingControl.set(false); If you re-enable this line, multiple threads will be executing the critical area if (counter.addAndGet(1)>1) { System.out.println("Multiple threads are executing this critical area ( counter = " + counter.get() + ")"); } latchWait.await(); // Simulating some work being done. counter.addAndGet(-1); waitingControl.set(false); } } static class TRun extends Thread { public void run() { try { latchAlign.countDown(); latchStart.await(); MultiStartTest.execute(); } catch (Throwable e) { e.printStackTrace(); } } } public static void main(String arg[]) { try { ArrayList<TRun> threads = new ArrayList<TRun>(); for (int i = 0; i < NUMBER_OF_THREADS; i++) { TRun t = new TRun(); t.start(); threads.add(t); } latchAlign.await(); latchStart.countDown(); latchWait.countDown(); for (Thread t: threads) { t.join(); } } catch (Exception e) { e.printStackTrace(); } } }