假设我有一个完整的任务队列,我需要提交给执行程序服务.我希望他们一次处理一个.我能想到的最简单的方法是:
但是,我试图完全避免阻塞.如果我有10,000个这样的队列,需要一次处理一个任务,我将耗尽堆栈空间,因为它们中的大多数将保持被阻塞的线程.
我想要的是提交一个任务并提供一个在任务完成时调用的回调.我将使用该回叫通知作为发送下一个任务的标志.(functionaljava和jetlang显然使用了这种非阻塞算法,但我无法理解他们的代码)
如何使用JDK的java.util.concurrent,而不是编写自己的执行器服务?
(向我提供这些任务的队列本身可以阻止,但这是一个需要解决的问题)
我想创建一个ThreadPoolExecutor
当它达到其最大大小并且队列已满时,该submit()
方法在尝试添加新任务时阻塞.我是否需要为此实现自定义RejectedExecutionHandler
,或者是否存在使用标准Java库执行此操作的方法?
执行者看起来像一个干净的抽象.您何时想直接使用Thread而不是依赖更强大的执行程序?
在将此标记为重复之前,请仔细阅读该问题.
下面是伪代码的片段.我的问题是 - 以下代码是否没有打败并行异步处理的概念?
我问这个的原因是因为在下面的代码中,主线程将提交一个要在不同线程中执行的任务.在队列中提交任务后,它会阻止Future.get()方法为任务返回值.我宁愿在主线程中执行任务,而不是提交到不同的线程并等待结果.通过在新线程中执行任务我获得了什么?
我知道你可以等待有限的时间等,但如果我真的关心结果呢?如果要执行多个任务,问题会变得更糟.在我看来,我们只是同步地完成工作.我知道Guava库提供了非阻塞侦听器接口.但我很想知道我对Future.get()API的理解是否正确.如果它是正确的,为什么Future.get()设计为阻止从而打败整个并行处理过程?
注 - 为了记录,我使用JAVA 6
public static void main(String[] args){
private ExectorService executorService = ...
Future future = executorService.submit(new Callable(){
public Object call() throws Exception {
System.out.println("Asynchronous Callable");
return "Callable Result";
}
});
System.out.println("future.get() = " + future.get());
}
Run Code Online (Sandbox Code Playgroud) 我试图使用ThreadPoolExecutor执行许多任务.以下是一个假设的例子:
def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
threadPoolExecutor.execute(runnable)
Run Code Online (Sandbox Code Playgroud)
问题是我很快得到了java.util.concurrent.RejectedExecutionException,因为任务数量超过了工作队列的大小.但是,我正在寻找的所需行为是让主线程阻塞,直到队列中有空间.完成此任务的最佳方法是什么?
假设我有以下代码:
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(myRunnable);
Run Code Online (Sandbox Code Playgroud)
现在,如果myRunnable
抛出一个RuntimeExcpetion
,我怎么能抓住它?一种方法是提供我自己的ThreadFactory
实现,newSingleThreadExecutor()
并uncaughtExceptionHandler
为其中的Thread
s 设置自定义.另一种方法是换myRunnable
行Runnable
包含try-catch -block 的本地(匿名).也许还有其他类似的解决方法.但是......不知怎的,这感觉很脏,我觉得不应该这么复杂.有清洁的解决方案吗?
是否有可能为Executors执行的任务设置优先级?我在JCIP中发现了一些关于它可能的陈述,但我找不到任何例子,我找不到任何与文档相关的内容.
来自JCIP:
执行策略指定任务执行的"内容,位置,时间和方式",包括:
- ...
- 应该以什么顺序执行任务(FIFO,LIFO,优先顺序)?
- ...
UPD:我意识到我不确切地问我想问什么.我真正想要的是:
如何使用/模拟thread.setPriority()
执行程序框架设置线程优先级(即什么)?
我正在开发一个Java项目,我需要异步运行多个任务.我被引导相信Executor是我做这件事的最佳方式,所以我很熟悉它.(可以获得报酬!)然而,我不清楚最好的方法是完成我想要做的事情.
为了争论,让我说我有两个任务在运行.预计两者都不会终止,并且两者都应该在应用程序的生命周期内运行.我正在尝试编写一个主包装类,以便:
现在,应该注意的是,两个任务的实现都会将代码包装run()
在一个永远不会运行完成的无限循环中,并且try/catch块应该处理所有运行时异常而不会中断循环.我正试图增加另一层确定性; 如果我或跟随我的人做了一些愚蠢的事情来挫败这些保护措施并停止任务,那么应用程序需要做出适当的反应.
是否有最佳实践来解决这个问题,那些比我更有经验的人会推荐?
FWIW,我已经掀起了这个测试类:
public class ExecTest {
private static ExecutorService executor = null;
private static Future results1 = null;
private static Future results2 = null;
public static void main(String[] args) {
executor = Executors.newFixedThreadPool(2);
while(true) {
try {
checkTasks();
Thread.sleep(1000);
}
catch (Exception e) {
System.err.println("Caught exception: " + e.getMessage());
}
}
}
private static void checkTasks() throws Exception{
if (results1 == null || results1.isDone() || results1.isCancelled()) {
results1 = executor.submit(new Test1());
} …
Run Code Online (Sandbox Code Playgroud) 为什么,为什么不java.util.concurrent
提供其ExecutorService
s 的队列长度指标?最近我发现自己做了这样的事情:
ExecutorService queue = Executors.newSingleThreadExecutor();
AtomicInteger queueLength = new AtomicInteger();
...
public void addTaskToQueue(Runnable runnable) {
if (queueLength.get() < MAX_QUEUE_LENGTH) {
queueLength.incrementAndGet(); // Increment queue when submitting task.
queue.submit(new Runnable() {
public void run() {
runnable.run();
queueLength.decrementAndGet(); // Decrement queue when task done.
}
});
} else {
// Trigger error: too long queue
}
}
Run Code Online (Sandbox Code Playgroud)
哪个工作正常,但......我认为这应该作为一部分实现ExecutorService
.这是一个愚蠢的错误,容易携带一个与实际队列分开的计数器,计数器应该指示它的长度(让我想起C数组).但是,ExecutorService
s是通过静态工厂方法获得的,因此无法简单地扩展优秀的单线程执行器并添加队列计数器.所以我该怎么做:
我有以下问题,我想知道究竟发生了什么.我使用Java的ScheduledExecutorService每五分钟运行一次任务.它工作得很好.执行者彻底改变了我在Java中进行线程编程的方式.
现在,我浏览了Java Doc,了解有关在计划任务因未处理的异常而失败但无法找到任何内容时的行为.
下一个计划任务是否仍会运行?如果存在未处理的异常,则计划的执行程序会停止计划任务吗?有人能指出有关这个简单问题的信息吗?
非常感谢.