我用a ExecutorService来执行任务.此任务可以递归地创建提交给它的其他任务,ExecutorService这些子任务也可以这样做.
我现在遇到的问题是,在我继续之前,我要等到所有任务完成(即所有任务都完成并且他们没有提交新任务).
我不能ExecutorService.shutdown()在主线程中调用,因为这可以防止新任务被接受ExecutorService.
ExecutorService.awaitTermination()如果shutdown没有被召唤,呼叫似乎什么都不做.
所以我有点卡在这里.ExecutorService要看到所有工人都闲着,这不是很难,是吗?我能想出的唯一不合理的解决方案是直接使用a ThreadPoolExecutor并getPoolSize()偶尔查询它.这样做真的没有更好的方法吗?
我有一个将异步任务委托给线程池的进程.我需要确保按顺序执行某些任务.所以举个例子
任务按顺序到达
任务a1,b1,c1,d1,e1,a2,a3,b2,f1
任务可以按任何顺序执行,除非存在自然依赖性,因此必须按顺序处理a1,a2,a3,方法是分配到同一个线程或阻止这些,直到我知道前一个#任务完成为止.
目前它不使用Java Concurrency包,但我正在考虑改变以充分利用线程管理.
有没有人有类似的解决方案或如何实现这一点的建议
我正在通过ExecutorService运行一些外部方法的多次调用.我希望能够中断这些方法,但遗憾的是它们不会自行检查中断标志.有什么办法可以强制从这些方法中引发异常吗?
我知道从任意位置抛出异常是有潜在危险的,在我的具体情况下,我愿意抓住这个机会并准备应对后果.
"外部方法"我的意思是来自外部库的一些方法,我不能修改它的代码(我可以,但是每当新版本发布时,这将使它成为维护的噩梦).
外部方法计算成本高,不受IO限制,因此它们不响应常规中断,我无法强行关闭通道或套接字等.正如我之前提到的,他们也没有检查中断标志.
代码在概念上类似于:
// my code
public void myMethod() {
Object o = externalMethod(x);
}
// External code
public class ExternalLibrary {
public Object externalMethod(Object) {
innerMethod1();
innerMethod1();
innerMethod1();
}
private void innerMethod1() {
innerMethod2();
// computationally intensive operations
}
private void innerMethod2() {
// computationally intensive operations
}
}
Run Code Online (Sandbox Code Playgroud)
Thread.stop()理论上我会做我想要的,但它不仅被弃用,而且它也只适用于实际线程,而我正在使用执行器任务(也可能与未来任务共享线程,例如在线程池中工作时) .然而,如果找不到更好的解决方案,我将转换我的代码以使用老式的Threads并使用此方法.
我尝试过的另一个选择是myMethod()使用特殊的"Interruptable"注释标记和类似的方法,然后使用AspectJ(我无疑是新手)来捕获所有方法调用 - 类似于:
@Before("call(* *.*(..)) && withincode(@Interruptable * *.*(..))")
public void checkInterrupt(JoinPoint thisJoinPoint) {
if (Thread.interrupted()) …Run Code Online (Sandbox Code Playgroud) 使用ExecutorService返回的时Executors.newSingleThreadExecutor(),如何中断它?
我有关于ThreadPoolExecutor的这个相当简单的问题.我有以下情况:我必须从队列中使用对象,为它们创建适当的工作人员任务并将它们提交给ThreadPoolExecutor.这很简单.但在关闭情况下,许多工作人员可能会排队等待执行.由于其中一个任务可能运行了一个小时,并且我希望相对快速地正常关闭应用程序,我想从ThreadPoolExecutor中丢弃所有排队的任务,而已经处理的任务应该正常完成.
ThreadPoolExecutor文档有一个remove()方法,但只允许删除特定的任务.purge()仅适用于已取消的Future任务.我的想法是清除队列中包含所有排队的任务.ThreadPoolExecutor提供对此内部队列的访问,但文档指出:
方法getQueue()允许访问工作队列以进行监视和调试.强烈建议不要将此方法用于任何其他目的.
所以抓住这个队列并清除它不是一个选择.此外,该文档的片段说:
当大量排队的任务被取消时,两个提供的方法remove(java.lang.Runnable)和purge()可用于协助存储回收.
怎么样?当然,我可以维护我提交给执行程序的所有任务的列表,在关闭的情况下,我遍历所有条目并使用remove()方法将它们从ThreadPoolExecutor中删除......但是...来吧,这是一个浪费记忆力和维护这份清单的麻烦.(例如,删除已执行的任务)
我感谢任何提示或解决方案!
我的代码片段:
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
Task t = new Task(response,inputToPass,pTypes,unit.getInstance(),methodName,unit.getUnitKey());
Future<SCCallOutResponse> fut = executor.submit(t);
response = fut.get(unit.getTimeOut(),TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// if the task is still running, a TimeOutException will occur while fut.get()
cat.error("Unit " + unit.getUnitKey() + " Timed Out");
response.setVote(SCCallOutConsts.TIMEOUT);
} catch (InterruptedException e) {
cat.error(e);
} catch (ExecutionException e) {
cat.error(e);
} finally {
executor.shutdown();
}
Run Code Online (Sandbox Code Playgroud)
我应该如何处理InterruptedException和ExecutionException代码?
在什么情况下,抛出这些异常?
java multithreading future executorservice interrupted-exception
我将一堆可运行的对象放入ExecutorService:
// simplified content of main method
ExecutorService threadPool = Executors.newCachedThreadPool();
for(int i = 0; i < workerCount; i++) {
threadPool.execute(new Worker());
}
Run Code Online (Sandbox Code Playgroud)
我希望我的程序/流程在所有工人完成后立即停止.但根据我的日志,它需要另外20-30秒才能发生.工人们没有分配任何资源,事实上,他们现在什么都不做.
不要误会我的意思,这对我来说不是一个至关重要的问题,我只是想了解发生了什么,我想知道这是不是正常行为.
TL:ExecutorService executorService = Executors.newFixedThreadPool(8);
调试中的DR 运行并发,但在正常运行时它启动并发,但后来在单线程中运行.
我有一些代码,我开始4个不同的任务ExecutorService.其中两个任务应该几乎立即完成,另外两个应该运行一段时间.
这些任务以秒为单位返回执行时间Future<Double>.
此代码负责任务执行和测量:
public Future<Double> measure(int[] arr, ProcessIntArray processIntArray, ExecutorService es) {
Callable<Double> task = () -> {
long start = System.nanoTime();
processIntArray.process(arr);
long end = System.nanoTime();
return (end - start) / 1000000000.0;
};
return es.submit(task);
}
Run Code Online (Sandbox Code Playgroud)
稍后,在启动这些任务后,我将按照先前运行的相同输入大小的执行时间顺序打印它们.
Future<Double> bubbleSortTime = measure(bubbleSortArray, Solution::bubbleSort, executorService);
Future<Double> insertionSortTime = measure(insertionSortArray, Solution::insertionSort, executorService);
Future<Double> quickSortTime = measure(quickSortArray, Solution::quickSort, executorService);
Future<Double> mergeSortTime = measure(mergeSortArray, Solution::mergeSort, executorService);
System.out.println();
System.out.println("array size: " + size);
System.out.println("quick …Run Code Online (Sandbox Code Playgroud) 我在多线程环境中经历了不同的并发模型(http://tutorials.jenkov.com/java-concurrency/concurrency-models.html)
本文重点介绍了三种并发模型.
并行工人
第一个并发模型就是我所说的并行工作模型.传入的工作分配给不同的工作人员.
流水线
工人们正在举办类似的工人在工厂的流水线.每个工人只执行完整工作的一部分.当该部分完成时,工人将工作转发给下一个工人.
每个工作者都在自己的线程中运行,并且不与其他工作者共享任何状态.这有时也称为无共享并发模型.
功能并行
函数并行的基本思想是使用函数调用实现程序.函数可以被视为彼此发送消息的" 代理 "或" 参与者 ",就像在流水线并发模型(AKA反应或事件驱动系统)中一样.当一个函数调用另一个函数时,这类似于发送消息.
现在我想为这三个概念映射java API支持
并行工作者:它是ExecutorService,ThreadPoolExecutor,CountDownLatch API吗?
装配线:将事件发送到JMS等消息传递系统并使用队列和主题的消息传递概念.
功能并行:ForkJoinPool在某种程度上和java 8流.与溪流相比,ForkJoin池很容易理解.
我是否正确映射这些并发模型?如果没有,请纠正我.
java multithreading executorservice countdownlatch forkjoinpool
我有ConcurrentLinkedDeque,我正在使用同步推送/弹出元素,我有一些异步任务,从堆栈中取一个元素,如果这个元素有邻居它正在推动它堆栈.
示例代码:
private ConcurrentLinkedDeque<Item> stack = new ConcurrentLinkedDeque<>();
private ExecutorService exec = Executors.newFixedThreadPool(5);
while ((item = stack.pollFirst()) != null) {
if (item == null) {
} else {
Runnable worker = new Solider(this, item);
exec.execute(worker);
}
}
class Solider{
public void run(){
if(item.hasNeighbors){
for(Item item:item.neighbors){
stack.push(item)
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
我想在while循环中有另外的声明来回答这个问题 - "Executor中的任何任务都在工作吗?"
executorservice ×10
java ×10
concurrency ×4
aop ×1
aspectj ×1
executor ×1
forkjoinpool ×1
future ×1
java-threads ×1
jvm ×1
threadpool ×1