我已经沮丧了一段时间,其默认行为ThreadPoolExecutor
支持ExecutorService
我们这么多人使用的线程池.引用Javadocs:
如果有多个corePoolSize但运行的maximumPoolSize线程少于maximumPoolSize,则只有在队列已满时才会创建新线程.
这意味着如果您使用以下代码定义线程池,它将永远不会启动第二个线程,因为它LinkedBlockingQueue
是无限制的.
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
Run Code Online (Sandbox Code Playgroud)
只有当您有一个有界队列并且队列已满时才会启动核心编号之上的任何线程.我怀疑大量的初级Java多线程程序员并不知道这种行为ThreadPoolExecutor
.
现在我有一个特定的用例,这不是最佳的.我正在寻找方法,而不是编写我自己的TPE课程来解决它.
我的要求是针对可能不可靠的第三方回拨的Web服务.
newFixedThreadPool(...)
大量的线程,大多数都处于休眠状态.newCachedThreadPool()
.在更多线程启动之前,如何解决ThreadPoolExecutor
队列需要限制和填充的限制?如何让它在排队任务之前启动更多线程?
编辑:
@Flavio提出了使用ThreadPoolExecutor.allowCoreThreadTimeOut(true)
核心线程超时并退出的好处.我考虑过这一点,但我仍然想要核心线程功能.如果可能的话,我不希望池中的线程数降到核心大小以下.
我有一个单线程生成器,它创建一些任务对象,然后将其添加到ArrayBlockingQueue
(具有固定大小).
我也开始了一个多线程的消费者.这是一个固定的线程池(Executors.newFixedThreadPool(threadCount);
).然后我向这个threadPool提交了一些ConsumerWorker入口,每个ConsumerWorker都对上面提到的ArrayBlockingQueue实例进行了引用.
每个这样的工作人员都会take()
在队列中执行并处理任务.
我的问题是,当没有更多的工作要做时,让工人知道的最佳方法是什么.换句话说,如何告诉Workers,生产者已经完成了对队列的添加,从这一点开始,每个工作人员在看到Queue为空时应该停止.
我现在得到的是一个设置,我的Producer初始化了一个回调,当他完成它的工作(向队列中添加东西)时会触发回调.我还保留了我创建并提交给ThreadPool的所有ConsumerWorkers的列表.当Producer Callback告诉我生产者已完成时,我可以告诉每个工人.此时,他们应该只是继续检查队列是否为空,当它变为空时它们应该停止,从而允许我优雅地关闭ExecutorService线程池.就是这样的
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement …
Run Code Online (Sandbox Code Playgroud) new SynchronousQueue()
new LinkedBlockingQueue(1)
Run Code Online (Sandbox Code Playgroud)
有什么不同?什么时候应该使用SynchronousQueue
对LinkedBlockingQueue
产能1?
特别是,我正在寻找一个阻塞队列.C++ 11中有这样的东西吗?如果没有,我的其他选择是什么?我真的不想再去自己的线程了.方式太容易出错.
我正在开发一个多线程项目,我需要生成多个线程来测量我的客户端代码的端到端性能,因为我正在进行负载和性能测试.所以我创建了以下使用的代码ExecutorService
.
以下是代码ExecutorService
:
public class MultithreadingExample {
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(20);
for (int i = 0; i < 100; i++) {
executor.submit(new NewTask());
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
}
}
class NewTask implements Runnable {
@Override
public void run() {
//Measure the end to end latency of my client code
}
}
Run Code Online (Sandbox Code Playgroud)
问题陈述:
现在我正在阅读互联网上的一些文章.我发现也有ThreadPoolExecutor
.所以我很困惑我应该使用哪一个.
如果我将以上代码替换为:
ExecutorService executor = Executors.newFixedThreadPool(20);
for (int i = 0; i < 100; i++) …
Run Code Online (Sandbox Code Playgroud) java multithreading executorservice blockingqueue threadpoolexecutor
当喜欢LinkedBlockingQueue
过ArrayBlockingQueue
?
其数据结构中使用LinkedBlockingQueue
和ArrayBlockingQueue
时:
虽然有类似的问题,但它没有强调哪个应该是首选的事实?
链接:
我有一个阻塞队列的对象.
我想写一个阻塞的线程,直到队列中有一个对象.与BlockingQueue.take()提供的功能类似.
但是,由于我不知道我是否能够成功处理对象,我想只是peek()而不是删除对象.我想删除该对象只有我能够成功处理它.
所以,我想要一个阻塞的peek()函数.目前,peek()只是在队列为空时根据javadoc返回.
我错过了什么吗?还有其他方法可以实现此功能吗?
编辑:
如果我只是使用了一个线程安全队列并且偷看和睡觉了?
public void run() {
while (!exit) {
while (queue.size() != 0) {
Object o = queue.peek();
if (o != null) {
if (consume(o) == true) {
queue.remove();
} else {
Thread.sleep(10000); //need to backoff (60s) and try again
}
}
}
Thread.sleep(1000); //wait 1s for object on queue
}
}
Run Code Online (Sandbox Code Playgroud)
请注意,我只有一个消费者线程和一个(单独的)生产者线程.我想这不如使用BlockingQueue有效......任何评论都赞赏.
我在一个非常简单的生产者 - 消费者场景中使用java.util.concurrent.BlockingQueue.例如,这个伪代码描述了消费者部分:
class QueueConsumer implements Runnable {
@Override
public void run() {
while(true)
{
try {
ComplexObject complexObject = myBlockingQueue.take();
//do something with the complex object
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
到现在为止还挺好.在阻塞队列的javadoc中,我读到:
BlockingQueue本质上不支持任何类型的"关闭"或"关闭"操作,以指示不再添加任何项目.这些功能的需求和使用倾向于依赖于实现.例如,一种常见的策略是生产者插入特殊的流末端或毒物对象,这些对象在被消费者采用时会相应地进行解释.
不幸的是,由于使用的泛型和ComplexObject的性质,将"毒物对象"推入队列并非易事.所以这个"常用策略"在我的场景中并不是很方便.
我的问题是:我可以用什么其他好的策略/模式来"关闭"队列?
谢谢!
在无限循环中使用队列中的值时 - 更有效:
1)阻止队列,直到通过take()获得值
while (value = queue.take()) { doSomething(value); }
Run Code Online (Sandbox Code Playgroud)
2)睡眠n毫秒并检查物品是否可用
while (true) {
if ((value = queue.poll()) != null) { doSomething(value); }
Thread.sleep(1000);
}
Run Code Online (Sandbox Code Playgroud)