如何使用ThreadPoolExecutor和自定义任务实现PriorityBlockingQueue

gre*_*eve 24 java priority-queue executor futuretask threadpool

我经常搜索,但找不到解决问题的方法.

我有自己的类,BaseTask使用a ThreadPoolExecutor来处理任务.
如果我不想要优先级(即使用a PriorityBlockingQueue),这可以正常工作,但当我尝试使用ClassCastException我得到的ThreadPoolExecutor因为FutureTask将我的任务包装到一个FutureTask对象中.
这显然是可以的,因为Comparable它没有实现newTaskFor(),但我将如何继续解决优先级问题?
我读过您可以覆盖ThreadPoolExecutorBaseTask,但我似乎无法在所有发现这个方法...?

我们欢迎所有的建议!

一些代码可以帮助:

在我的BaseFutureTask班上,我有

private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>();

private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);

    public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
};

private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor(
    1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);

private final BaseFutureTask<Result> mFuture;

public BaseTask(int priority) {
    mFuture = new BaseFutureTask<Result>(mWorker, priority);
}

public final BaseTask<Params, Progress, Result> execute(Params... params) {

    /* Some unimportant code here */

    sExecutor.execute(mFuture);
}
Run Code Online (Sandbox Code Playgroud)

BaseThreadPoolExecutor课堂上

@Override
public int compareTo(BaseFutureTask another) {
    long diff = this.priority - another.priority;

    return Long.signum(diff);
}
Run Code Online (Sandbox Code Playgroud)

submit类i中重写3个submit方法...
此类中的构造函数被调用,但没有一个BaseTask方法

dup*_*dup 14

public class ExecutorPriority {

public static void main(String[] args) {

    PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new ComparePriority());

    Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq);
    exe.execute(new RunWithPriority(2) {

        @Override
        public void run() {

            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });
    exe.execute(new RunWithPriority(10) {

        @Override
        public void run() {
            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });

}

private static class ComparePriority<T extends RunWithPriority> implements Comparator<T> {

    @Override
    public int compare(T o1, T o2) {
        return o1.getPriority().compareTo(o2.getPriority());
    }
}
Run Code Online (Sandbox Code Playgroud)

}

你可以猜测RunWithPriority是一个抽象类,它是Runnable并且有一个Integer优先级字段

  • 我真的不明白你的榜样!我是愚蠢还是第一个`Executor ex`从未使用过?大声笑 (3认同)
  • 我不明白为什么这是最受欢迎的答案。`new ComparePriority()` 没有指定泛型类型参数,所以我认为这是一个快速而肮脏的解决方案。 (3认同)

Sta*_*kyy 10

您可以使用这些帮助程序类:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}
Run Code Online (Sandbox Code Playgroud)

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}
Run Code Online (Sandbox Code Playgroud)

这个帮手方法:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}
Run Code Online (Sandbox Code Playgroud)

然后用它是这样的:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)


the*_*ger 5

我将尝试用功能齐全的代码来解释这个问题。但在深入代码之前,我想解释一下 PriorityBlockingQueue

PriorityBlockingQueue : PriorityBlockingQueue 是 BlockingQueue 的一个实现。它接受任务及其优先级,并首先提交具有最高优先级的任务执行。如果任何两个任务具有相同的优先级,那么我们需要提供一些自定义逻辑来决定哪个任务先执行。

现在让我们直接进入代码。

驱动程序类:这个类创建一个执行器,它接受任务,然后提交它们以供执行。这里我们创建了两个任务,一个优先级低,另一个优先级高。在这里,我们告诉执行程序最多运行 1 个线程并使用 PriorityBlockingQueue。

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);

    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
}
Run Code Online (Sandbox Code Playgroud)

MyTask 类:MyTask 实现 Runnable 并在构造函数中接受优先级作为参数。当这个任务运行时,它会打印一条消息,然后让线程休眠 1 秒。

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}
Run Code Online (Sandbox Code Playgroud)

MyFutureTask 类:由于我们使用 PriorityBlocingQueue 来保存我们的任务,我们的任务必须包装在 FutureTask 中,并且我们的 FutureTask 实现必须实现 Comparable 接口。Comparable接口比较2个不同任务的优先级,提交优先级最高的任务执行。

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }
Run Code Online (Sandbox Code Playgroud)

优先级:不言自明的优先级。

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}
Run Code Online (Sandbox Code Playgroud)

现在当我们运行这个例子时,我们得到以下输出

The following Runnable is getting executed High
The following Runnable is getting executed Low
Run Code Online (Sandbox Code Playgroud)

尽管我们先提交了 LOW 优先级的任务,然后提交了 HIGH 优先级的任务,但是由于我们使用的是 PriorityBlockingQueue,因此优先级较高的任务将首先执行。


Ric*_*ler 0

看起来他们把它排除在 apache 和谐之外。大约一年前有一个svn 提交日志修复了newTaskFor. 您可能只需重写submit扩展中的函数ThreadPoolExecutor即可创建扩展,FutureTaskComparable. 它们不是很长