我正在尝试编写一个解决方案,其中单个线程生成可以并行执行的I/O密集型任务.每个任务都有重要的内存数据.所以我希望能够限制暂时待处理的任务数量.
如果我像这样创建ThreadPoolExecutor:
ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(maxQueue));
Run Code Online (Sandbox Code Playgroud)
然后在队列填满并且所有线程都已忙时executor.submit(callable)抛出RejectedExecutionException.
executor.submit(callable)当队列已满且所有线程都忙时,我该怎么做才能阻塞?
编辑:我试过这个:
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
Run Code Online (Sandbox Code Playgroud)
它有点实现了我想要实现的效果但是以一种不雅的方式(基本上被拒绝的线程在调用线程中运行,因此这阻止了调用线程提交更多).
编辑:(提问后5年)
对于阅读此问题及其答案的任何人,请不要将接受的答案作为一个正确的解决方案.请仔细阅读所有答案和评论.
我试图使用ThreadPoolExecutor执行许多任务.以下是一个假设的例子:
def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
threadPoolExecutor.execute(runnable)
Run Code Online (Sandbox Code Playgroud)
问题是我很快得到了java.util.concurrent.RejectedExecutionException,因为任务数量超过了工作队列的大小.但是,我正在寻找的所需行为是让主线程阻塞,直到队列中有空间.完成此任务的最佳方法是什么?
我尝试过创建和执行ThreadPoolExecutor
int poolSize = 2;
int maxPoolSize = 3;
ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);
Run Code Online (Sandbox Code Playgroud)
如果我连续尝试7日,8日......任务
threadPool.execute(task);
Run Code Online (Sandbox Code Playgroud)
在队列达到最大大小后,
它开始抛出"RejectedExecutionException".意味着我失去了添加这些任务.
在这里,如果BlockingQueue缺少任务,那么它的作用是什么?意味着它为什么不等待?
从BlockingQueue的定义
一个队列,它还支持在检索元素时等待队列变为非空的操作,并在存储元素时等待队列中的空间可用.
为什么我们不能使用linkedlist(正常队列实现而不是阻塞队列)?
Sun Java(1.6)ScheduledThreadPoolExecutor是ThreadPoolExecutor内部扩展,它的实现DelayQueue是一个无界队列.我需要的是一个ScheduledThreadpoolExecutor有界队列即它在队列中的积累,这样,当队列中的任务超过了限度,它开始拒绝进一步提交的任务,防止JVM走出去的记忆任务的限制.
令人惊讶的是,谷歌或stackoverflow没有指出我正在讨论这个问题的任何结果.有没有这样的东西可用我错过了?如果没有,我如何实现ScheduledThreadpoolExecutor以最佳方式提供我期望的功能?
我有下面的代码片段,运行正常.但问题是它立即创建并在执行程序队列上放置了2000多个任务.
我需要检查执行程序队列中已有的任务是否已完成,然后再给它更多任务.它不必是精确的,即如果队列剩下<10个任务,则再添加50个任务.
因此执行程序任务队列没有那么多挂起任务,这也将允许shutdown()及时工作,否则即使被调用,执行程序仍将首先尝试完成其队列中的所有2000个任务.
完成此任务的最佳方法是什么?谢谢
executor = Executors.newFixedThreadPool(numThreads);
while(some_condition==true)
{
//if(executor < 10 tasks pending) <---- how do i do this?
//{
for(int k=0;k<20;k++)
{
Runnable worker = new MyRunnable();
executor.execute(worker);
}
//}
//else
//{
// wait(3000);
//}
}
Run Code Online (Sandbox Code Playgroud)
使用信号量更新:
private final Semaphore semaphore = new Semaphore(10)
executor = new ThreadPoolExecutorWithSemaphoreFromJohnExample();
while(some_condition==true)
{
Runnable worker = new MyRunnable();
//So at this point if semaphore is full, then while loop would PAUSE(??) until
//semaphore frees up again.
executor.execute(worker);
}
Run Code Online (Sandbox Code Playgroud)