使用ExecutorService控制任务执行顺序

Wir*_*tap 31 java concurrency executorservice

我有一个将异步任务委托给线程池的进程.我需要确保按顺序执行某些任务.所以举个例子

任务按顺序到达

任务a1,b1,c1,d1,e1,a2,a3,b2,f1

任务可以按任何顺序执行,除非存在自然依赖性,因此必须按顺序处理a1,a2,a3,方法是分配到同一个线程或阻止这些,直到我知道前一个#任务完成为止.

目前它不使用Java Concurrency包,但我正在考虑改变以充分利用线程管理.

有没有人有类似的解决方案或如何实现这一点的建议

Mik*_*e Q 14

当我在过去完成此操作时,我通常会将一个组件处理,然后将callables/runnables提交给Executor.

就像是.

  • 获得要运行的任务列表,其中一些具有依赖项
  • 创建一个Executor并使用ExecutorCompletionService进行换行
  • 搜索所有任何没有依赖关系的任务,通过完成服务安排它们
  • 轮询完成服务
  • 每项任务完成
    • 将其添加到"已完成"列表中
    • 重新评估任何等待任务到"完成列表",看看它们是否是"依赖完成".如果这样安排他们
    • 冲洗重复,直到提交/完成所有任务

完成服务是一种很好的方式,能够在完成任务时完成任务,而不是试图轮询一堆期货.但是,您可能希望保留在Map<Future, TaskIdentifier>通过完成服务计划任务时填充的内容,这样当完成服务为您提供完整的Future时,您可以确定TaskIdentifier它是什么.

如果您发现自己处于任务仍在等待运行的状态,但没有任何正在运行且无法安排任何内容,那么您就会遇到循环依赖性问题.


小智 12

我编写了自己的Executor,它保证了具有相同密钥的任务的任务排序.它使用具有相同键的订单任务的队列映射.每个键控任务使用相同的键执行下一个任务.

此解决方案不处理RejectedExecutionException或委派的Executor中的其他异常!所以委派的执行者应该是"无限的".

import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;

/**
* This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly).
*/
public class OrderingExecutor implements Executor{

    private final Executor delegate;
    private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>();

    public OrderingExecutor(Executor delegate){
        this.delegate = delegate;
    }

    @Override
    public void execute(Runnable task) {
        // task without key can be executed immediately
        delegate.execute(task);
    }

    public void execute(Runnable task, Object key) {
        if (key == null){ // if key is null, execute without ordering
            execute(task);
            return;
        }

        boolean first;
        Runnable wrappedTask;
        synchronized (keyedTasks){
            Queue<Runnable> dependencyQueue = keyedTasks.get(key);
            first = (dependencyQueue == null);
            if (dependencyQueue == null){
                dependencyQueue = new LinkedList<Runnable>();
                keyedTasks.put(key, dependencyQueue);
            }

            wrappedTask = wrap(task, dependencyQueue, key);
            if (!first)
                dependencyQueue.add(wrappedTask);
        }

        // execute method can block, call it outside synchronize block
        if (first)
            delegate.execute(wrappedTask);

    }

    private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
        return new OrderedTask(task, dependencyQueue, key);
    }

    class OrderedTask implements Runnable{

        private final Queue<Runnable> dependencyQueue;
        private final Runnable task;
        private final Object key;

        public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
            this.task = task;
            this.dependencyQueue = dependencyQueue;
            this.key = key;
        }

        @Override
        public void run() {
            try{
                task.run();
            } finally {
                Runnable nextTask = null;
                synchronized (keyedTasks){
                    if (dependencyQueue.isEmpty()){
                        keyedTasks.remove(key);
                    }else{
                        nextTask = dependencyQueue.poll();
                    }
                }
                if (nextTask!=null)
                    delegate.execute(nextTask);
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)


cle*_*tus 3

当您提交RunnableCallable至 时,ExecutorService您会收到Future回报。让依赖于 a1 的线程传递 a1Future并调用Future.get(). 这将阻塞直到线程完成。

所以:

ExecutorService exec = Executor.newFixedThreadPool(5);
Runnable a1 = ...
final Future f1 = exec.submit(a1);
Runnable a2 = new Runnable() {
  @Override
  public void run() {
    f1.get();
    ... // do stuff
  }
}
exec.submit(a2);
Run Code Online (Sandbox Code Playgroud)

等等。

  • 我认为这不适用于固定线程池,因为线程可能会同时在“f1.get()”上阻塞并陷入死锁。 (4认同)