1 Reply Latest reply on Feb 26, 2016 8:17 AM by ironman_br

    ManagedExecutorService - how to use priorityQueue?

    haso

      Hello,

      I'm trying to use `ManagedScheduledExecutorService` in my new project. I'm using `Wildfly 9` server with such a configuration:

       

      <managed-scheduled-executor-service name="processMessages" jndi-name="java:jboss/ee/concurrency/scheduler/processMessages" context-service="default" core-threads="15" keepalive-time="432000000"/>
      

       

      context-service has default values.

      I'm obtaining executor it with:

       

       @Resource(lookup="java:jboss/ee/concurrency/scheduler/processMessages")
        private ManagedScheduledExecutorService processMessagesService; 
      

       

      It is working fine. I can add, schedule tasks but my question is about queue and ordering tasks. I noticed that default queue has `FIFO` algorithm. But i need to order tasks with some kind of priority.

       

      Changing default queue to some sort of priorityQueue would be nice, than in task class implements `Comparator` but how can it be achieved? Can i even do it?

        • 1. Re: ManagedExecutorService - how to use priorityQueue?
          ironman_br

          I'm testing a priorityQueue based in this implementation: http://binkley.blogspot.com.br/2009/04/jumping-work-queue-in-executor.html

           

          Using a ManagedThreadFactory I can create a custom managed ThreadPoolExecutor (I think you can create a custom ScheduledThreadPoolExecutor)

           

           

          @ApplicationScoped
          public class PoolExecutor {
          
          
            private ExecutorService threadPoolExecutor = null;
            int  corePoolSize  =    5;
            int  maxPoolSize   =   10;
            long keepAliveTime = 5000;
          
          
            @Resource(name = "DefaultManagedThreadFactory")
            ManagedThreadFactory factory;
          
          
            public ExecutorService getThreadPoolExecutor() {
            return threadPoolExecutor;
            }
          
          
            @PostConstruct
            public void init() {             
            threadPoolExecutor = new PriorityExecutor(factory, corePoolSize, maxPoolSize);      
            }
          
          
                   @PreDestroy
            public void releaseResources() {
            threadPoolExecutor.shutdown();   
            }
          
          
          }
          
          
          
          
          
          
          
          
          
          
          

           

          I made few modifications to PriorityExecutor to define pool size via constructor:

          import static java.util.concurrent.TimeUnit.SECONDS;
          
          
          public class PriorityExecutor extends ThreadPoolExecutor {
          
            public PriorityExecutor(int corePoolSize, int maximumPoolSize) {
            super(corePoolSize, maximumPoolSize, 60L, SECONDS,
            new PriorityBlockingQueue<Runnable>(11, PriorityTask.COMPARATOR));
            }
          
          
            public PriorityExecutor(final ThreadFactory threadFactory, int corePoolSize, int maximumPoolSize) {
            super(corePoolSize, maximumPoolSize, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, PriorityTask.COMPARATOR),
            threadFactory);
            }
          
          
            public PriorityExecutor(int corePoolSize, int maximumPoolSize, final RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, PriorityTask.COMPARATOR),
            handler);
            }
          
          
            public PriorityExecutor(final ThreadFactory threadFactory, final RejectedExecutionHandler handler) {
            super(0, Integer.MAX_VALUE, 60L, SECONDS, new PriorityBlockingQueue<Runnable>(11, PriorityTask.COMPARATOR),
            threadFactory, handler);
            }
          
          
            @Override
            protected <T> RunnableFuture<T> newTaskFor(final Callable<T> callable) {
            if (callable instanceof Important)
            return new PriorityTask<T>(((Important) callable).getPriority(), callable);
            else
            return new PriorityTask<T>(0, callable);
            }
          
          
            @Override
            protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
            if (runnable instanceof Important)
            return new PriorityTask<T>(((Important) runnable).getPriority(), runnable, value);
            else
            return new PriorityTask<T>(0, runnable, value);
            }
          
          
            public interface Important {
            int getPriority();
            }
          
          
          }
          
          
          
          
          
          
          
          
          
          
          
          
          
          
          
          
          
          

           

          public class PriorityTask<T> extends FutureTask<T> implements Comparable<PriorityTask<T>>, Important {
            public static final PriorityTaskComparator COMPARATOR = new PriorityTaskComparator();
          
          
            private final int priority;
            private final long timestamp;
          
          
            public PriorityTask(final int priority, final Callable<T> tCallable) {
            super(tCallable);
            this.priority = priority;
            timestamp = System.currentTimeMillis();
            }
          
          
            public PriorityTask(final int priority, final Runnable runnable, final T result) {
            super(runnable, result);
            this.priority = priority;
            timestamp = System.currentTimeMillis();
            }
          
          
            @Override
            public int getPriority() {
            return priority;
            }
          
            public long getTimestamp() {
            return timestamp;
            }
          
            @Override
            public int compareTo(final PriorityTask<T> o) {
            final long diff = o.priority - priority;
            return 0 == diff ? 0 : 0 > diff ? -1 : 1;
            }
          
          
            private static class PriorityTaskComparator implements Comparator<Runnable> {
            @SuppressWarnings({ "unchecked", "rawtypes" })
            @Override
            public int compare(final Runnable left, final Runnable right) {
            return ((PriorityTask) left).compareTo((PriorityTask) right);
            }
            }
          }