Wir*_*tap 31 java concurrency executorservice
我有一个将异步任务委托给线程池的进程.我需要确保按顺序执行某些任务.所以举个例子
任务按顺序到达
任务a1,b1,c1,d1,e1,a2,a3,b2,f1
任务可以按任何顺序执行,除非存在自然依赖性,因此必须按顺序处理a1,a2,a3,方法是分配到同一个线程或阻止这些,直到我知道前一个#任务完成为止.
目前它不使用Java Concurrency包,但我正在考虑改变以充分利用线程管理.
有没有人有类似的解决方案或如何实现这一点的建议
Mik*_*e Q 14
当我在过去完成此操作时,我通常会将一个组件处理,然后将callables/runnables提交给Executor.
就像是.
完成服务是一种很好的方式,能够在完成任务时完成任务,而不是试图轮询一堆期货.但是,您可能希望保留在Map<Future, TaskIdentifier>通过完成服务计划任务时填充的内容,这样当完成服务为您提供完整的Future时,您可以确定TaskIdentifier它是什么.
如果您发现自己处于任务仍在等待运行的状态,但没有任何正在运行且无法安排任何内容,那么您就会遇到循环依赖性问题.
小智 12
我编写了自己的Executor,它保证了具有相同密钥的任务的任务排序.它使用具有相同键的订单任务的队列映射.每个键控任务使用相同的键执行下一个任务.
此解决方案不处理RejectedExecutionException或委派的Executor中的其他异常!所以委派的执行者应该是"无限的".
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Executor;
/**
* This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly).
*/
public class OrderingExecutor implements Executor{
private final Executor delegate;
private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>();
public OrderingExecutor(Executor delegate){
this.delegate = delegate;
}
@Override
public void execute(Runnable task) {
// task without key can be executed immediately
delegate.execute(task);
}
public void execute(Runnable task, Object key) {
if (key == null){ // if key is null, execute without ordering
execute(task);
return;
}
boolean first;
Runnable wrappedTask;
synchronized (keyedTasks){
Queue<Runnable> dependencyQueue = keyedTasks.get(key);
first = (dependencyQueue == null);
if (dependencyQueue == null){
dependencyQueue = new LinkedList<Runnable>();
keyedTasks.put(key, dependencyQueue);
}
wrappedTask = wrap(task, dependencyQueue, key);
if (!first)
dependencyQueue.add(wrappedTask);
}
// execute method can block, call it outside synchronize block
if (first)
delegate.execute(wrappedTask);
}
private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
return new OrderedTask(task, dependencyQueue, key);
}
class OrderedTask implements Runnable{
private final Queue<Runnable> dependencyQueue;
private final Runnable task;
private final Object key;
public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) {
this.task = task;
this.dependencyQueue = dependencyQueue;
this.key = key;
}
@Override
public void run() {
try{
task.run();
} finally {
Runnable nextTask = null;
synchronized (keyedTasks){
if (dependencyQueue.isEmpty()){
keyedTasks.remove(key);
}else{
nextTask = dependencyQueue.poll();
}
}
if (nextTask!=null)
delegate.execute(nextTask);
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
当您提交Runnable或Callable至 时,ExecutorService您会收到Future回报。让依赖于 a1 的线程传递 a1Future并调用Future.get(). 这将阻塞直到线程完成。
所以:
ExecutorService exec = Executor.newFixedThreadPool(5);
Runnable a1 = ...
final Future f1 = exec.submit(a1);
Runnable a2 = new Runnable() {
@Override
public void run() {
f1.get();
... // do stuff
}
}
exec.submit(a2);
Run Code Online (Sandbox Code Playgroud)
等等。
| 归档时间: |
|
| 查看次数: |
17989 次 |
| 最近记录: |