如何使用Java 5中的ExecutorService实现任务优先级划分?

Chr*_*s R 43 java concurrency multithreading

我正在实现一个线程池机制,我想在其中执行不同优先级的任务.我想有一个很好的机制,我可以向服务提交一个高优先级的任务,并在其他任务之前安排它.任务的优先级是任务本身的内在属性(我是否表达任务作为CallableRunnable不重要的是我).

现在,从表面上看,我可以使用a PriorityBlockingQueue作为我的任务队列ThreadPoolExecutor,但该队列包含Runnable对象,这可能是也可能不是Runnable我提交给它的任务.而且,如果我已经提交了Callable任务,那么就不清楚它是如何映射的.

有没有办法做到这一点?我真的宁愿不为此而努力,因为我更有可能以这种方式弄错.

(旁白;是的,我知道在这样的事情中,低优先级工作可能会出现饥饿.对于有合理保证公平性的解决方案,可以加分(?!))

Mik*_*ike 16

我已经以合理的方式解决了这个问题,我将在下面对其进行描述,以便将来参考我和其他任何使用Java Concurrent库遇到此问题的人.

使用a PriorityBlockingQueue作为保留任务以便以后执行的手段确实是正确方向的运动.问题是PriorityBlockingQueue必须通常实例化以包含Runnable实例,并且不可能compareToRunnable接口上调用(或类似).

解决问题.在创建Executor时,必须给它一个PriorityBlockingQueue.队列应该进一步给定一个自定义比较器来进行适当的位置排序:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator());
Run Code Online (Sandbox Code Playgroud)

现在,看看CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> {

    @Override
    public int compare(MyType first, MyType second) {
         return comparison;
    }

}
Run Code Online (Sandbox Code Playgroud)

到目前为止,一切看起来都非常直截了当.这里有点粘.我们的下一个问题是处理Executor创建FutureTasks.在Executor中,我们必须如下覆盖newTaskFor:

@Override
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
    //Override the default FutureTask creation and retrofit it with
    //a custom task. This is done so that prioritization can be accomplished.
    return new CustomFutureTask(c);
}
Run Code Online (Sandbox Code Playgroud)

我们正在尝试执行cCallable任务在哪里.现在,我们来看看CustomFutureTask:

public class CustomFutureTask extends FutureTask {

    private CustomTask task;

    public CustomFutureTask(Callable callable) {
        super(callable);
        this.task = (CustomTask) callable;
    }

    public CustomTask getTask() {
        return task;
    }

}
Run Code Online (Sandbox Code Playgroud)

注意getTask方法.我们稍后会用它来抓住CustomFutureTask我们创造的原始任务.

最后,让我们修改我们尝试执行的原始任务:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> {

    private final MyType myType;

    public CustomTask(MyType myType) {
        this.myType = myType;
    }

    @Override
    public MyType call() {
        //Do some things, return something for FutureTask implementation of `call`.
        return myType;
    }

    @Override
    public int compareTo(MyType task2) {
        return new CustomTaskComparator().compare(this.myType, task2.myType);
    }

}
Run Code Online (Sandbox Code Playgroud)

您可以看到我们Comparable在任务中实现委托给实际的Comparatorfor MyType.

你有它,使用Java库为Executor定制优先级!它需要一些弯曲,但它是我能够想出的最干净的.我希望这对某人有帮助!


Ada*_*icz 8

乍一看这似乎可以定义为您的任务的接口,扩展RunnableCallable<T>Comparable.然后ThreadPoolExecutor使用a PriorityBlockingQueue作为队列包装,并且只接受实现接口的任务.

考虑到您的评论,看起来一个选项是扩展ThreadPoolExecutor和覆盖submit()方法.请参阅AbstractExecutorService查看默认值的内容; 他们做的是包裹Runnable还是CallableFutureTaskexecute()它.我可能通过编写一个实现ExecutorService并委托给匿名内部的包装类来完成这项工作ThreadPoolExecutor.将它们包裹在具有您优先权的东西中,以便您Comparator可以获得它.

  • 这也是我的看法,但这就是问题所在; 传递给优先级队列的`Runnable`实例是_not_我直接提交的任务,它们被包装在`java.util.concurrent.FutureTask <V>中,当然它没有被排序办法.如果我使用`execute` - 例如,它不接受`Callable` - 那么它会抛出我自己的对象. (2认同)