This content has been marked as final.
Show 1 reply
-
1. Re: ManagedExecutorService - how to use priorityQueue?
ironman_br Feb 26, 2016 8:17 AM (in response to haso)I'm testing a priorityQueue based in this implementation: http://binkley.blogspot.com.br/2009/04/jumping-work-queue-in-executor.html
Using a ManagedThreadFactory I can create a custom managed ThreadPoolExecutor (I think you can create a custom ScheduledThreadPoolExecutor)
@ApplicationScoped public class PoolExecutor { private ExecutorService threadPoolExecutor = null; int corePoolSize = 5; int maxPoolSize = 10; long keepAliveTime = 5000; @Resource(name = "DefaultManagedThreadFactory") ManagedThreadFactory factory; public ExecutorService getThreadPoolExecutor() { return threadPoolExecutor; } @PostConstruct public void init() { threadPoolExecutor = new PriorityExecutor(factory, corePoolSize, maxPoolSize); } @PreDestroy public void releaseResources() { threadPoolExecutor.shutdown(); } }
I made few modifications to PriorityExecutor to define pool size via constructor:
import static java.util.concurrent.TimeUnit.SECONDS; public class PriorityExecutor extends ThreadPoolExecutor { public PriorityExecutor(int corePoolSize, int maximumPoolSize) { super(corePoolSize, maximumPoolSize, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, PriorityTask.COMPARATOR)); } public PriorityExecutor(final ThreadFactory threadFactory, int corePoolSize, int maximumPoolSize) { super(corePoolSize, maximumPoolSize, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, PriorityTask.COMPARATOR), threadFactory); } public PriorityExecutor(int corePoolSize, int maximumPoolSize, final RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, PriorityTask.COMPARATOR), handler); } public PriorityExecutor(final ThreadFactory threadFactory, final RejectedExecutionHandler handler) { super(0, Integer.MAX_VALUE, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, PriorityTask.COMPARATOR), threadFactory, handler); } @Override protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) { if (callable instanceof Important) return new PriorityTask<T>(((Important) callable).getPriority(), callable); else return new PriorityTask<T>(0, callable); } @Override protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) { if (runnable instanceof Important) return new PriorityTask<T>(((Important) runnable).getPriority(), runnable, value); else return new PriorityTask<T>(0, runnable, value); } public interface Important { int getPriority(); } }
public class PriorityTask<T> extends FutureTask<T> implements Comparable<PriorityTask<T>>, Important { public static final PriorityTaskComparator COMPARATOR = new PriorityTaskComparator(); private final int priority; private final long timestamp; public PriorityTask(final int priority, final Callable<T> tCallable) { super(tCallable); this.priority = priority; timestamp = System.currentTimeMillis(); } public PriorityTask(final int priority, final Runnable runnable, final T result) { super(runnable, result); this.priority = priority; timestamp = System.currentTimeMillis(); } @Override public int getPriority() { return priority; } public long getTimestamp() { return timestamp; } @Override public int compareTo(final PriorityTask<T> o) { final long diff = o.priority - priority; return 0 == diff ? 0 : 0 > diff ? -1 : 1; } private static class PriorityTaskComparator implements Comparator<Runnable> { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public int compare(final Runnable left, final Runnable right) { return ((PriorityTask) left).compareTo((PriorityTask) right); } } }