Lac*_*lev 27 java multithreading blockingqueue
我在一个非常简单的生产者 - 消费者场景中使用java.util.concurrent.BlockingQueue.例如,这个伪代码描述了消费者部分:
class QueueConsumer implements Runnable {
@Override
public void run() {
while(true)
{
try {
ComplexObject complexObject = myBlockingQueue.take();
//do something with the complex object
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
到现在为止还挺好.在阻塞队列的javadoc中,我读到:
BlockingQueue本质上不支持任何类型的"关闭"或"关闭"操作,以指示不再添加任何项目.这些功能的需求和使用倾向于依赖于实现.例如,一种常见的策略是生产者插入特殊的流末端或毒物对象,这些对象在被消费者采用时会相应地进行解释.
不幸的是,由于使用的泛型和ComplexObject的性质,将"毒物对象"推入队列并非易事.所以这个"常用策略"在我的场景中并不是很方便.
我的问题是:我可以用什么其他好的策略/模式来"关闭"队列?
谢谢!
jdm*_*hal 19
如果你有一个消费者线程的句柄,你可以打断它.使用您提供的代码,这将杀死消费者.我不希望制片人有这个; 它可能不得不以某种方式回调程序控制器让它知道它已经完成.然后控制器将中断消费者线程.
在遵守中断之前,您始终可以完成工作.例如:
class QueueConsumer implements Runnable {
@Override
public void run() {
while(!(Thread.currentThread().isInterrupted())) {
try {
final ComplexObject complexObject = myBlockingQueue.take();
this.process(complexObject);
} catch (InterruptedException e) {
// Set interrupted flag.
Thread.currentThread().interrupt();
}
}
// Thread is getting ready to die, but first,
// drain remaining elements on the queue and process them.
final LinkedList<ComplexObject> remainingObjects;
myBlockingQueue.drainTo(remainingObjects);
for(ComplexObject complexObject : remainingObjects) {
this.process(complexObject);
}
}
private void process(final ComplexObject complexObject) {
// Do something with the complex object.
}
}
Run Code Online (Sandbox Code Playgroud)
我实际上更喜欢以某种方式毒害队列.如果你想杀死线程,请让线程自杀.
(很高兴看到有人处理InterruptedException
得当.)
这里似乎有一些关于处理中断的争论.首先,我希望大家阅读这篇文章:http://www.ibm.com/developerworks/java/library/j-jtp05236.html
现在,在理解没有人真正阅读过的情况下,这就是交易.一个线程只会InterruptedException
在中断时收到一个当前阻塞的线程.在这种情况下,Thread.interrupted()
将返回false
.如果它没有阻塞,它将不会收到此异常,而是Thread.interrupted()
会返回true
.因此,您的环路保护应该绝对,无论如何,检查Thread.interrupted()
,或以其他方式冒险错过线程的中断.
因此,既然你正在检查Thread.interrupted()
什么,并且你被迫捕获InterruptedException
(并且即使你没有被强迫也应该处理它),你现在有两个代码区域来处理相同的事件,线程中断.处理此问题的一种方法是将它们规范化为一个条件,这意味着布尔状态检查可以抛出异常,或者异常可以设置布尔状态.我选择了以后的.
编辑:请注意,静态Thread#interrupted方法清除当前线程的中断状态.
Jas*_*n S 11
使这个简单的另一个想法:
class ComplexObject implements QueueableComplexObject
{
/* the meat of your complex object is here as before, just need to
* add the following line and the "implements" clause above
*/
@Override public ComplexObject asComplexObject() { return this; }
}
enum NullComplexObject implements QueueableComplexObject
{
INSTANCE;
@Override public ComplexObject asComplexObject() { return null; }
}
interface QueueableComplexObject
{
public ComplexObject asComplexObject();
}
Run Code Online (Sandbox Code Playgroud)
然后BlockingQueue<QueueableComplexObject>
用作队列.当你想结束队列的处理时,做queue.offer(NullComplexObject.INSTANCE)
.在消费者方面,做
boolean ok = true;
while (ok)
{
ComplexObject obj = queue.take().asComplexObject();
if (obj == null)
ok = false;
else
process(obj);
}
/* interrupt handling elided: implement this as you see fit,
* depending on whether you watch to swallow interrupts or propagate them
* as in your original post
*/
Run Code Online (Sandbox Code Playgroud)
没有instanceof
必要,你不必构建一个ComplexObject
可能昂贵/困难的假货,具体取决于它的实现.
另一种方法是将您正在进行的处理包装起来ExecutorService
,并让ExecutorService
自己控制是否将作业添加到队列中.
基本上,您可以利用ExecutorService.shutdown()
,当被调用时,不允许执行程序处理任何其他任务.
我不确定你当前是如何向QueueConsumer
你的例子中提交任务的.我假设您有某种submit()
方法,并在示例中使用了类似的方法.
import java.util.concurrent.*;
class QueueConsumer {
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public void shutdown() {
executor.shutdown(); // gracefully shuts down the executor
}
// 'Result' is a class you'll have to write yourself, if you want.
// If you don't need to provide a result, you can just Runnable
// instead of Callable.
public Future<Result> submit(final ComplexObject complexObject) {
if(executor.isShutdown()) {
// handle submitted tasks after the executor has been told to shutdown
}
return executor.submit(new Callable<Result>() {
@Override
public Result call() {
return process(complexObject);
}
});
}
private Result process(final ComplexObject complexObject) {
// Do something with the complex object.
}
}
Run Code Online (Sandbox Code Playgroud)
这个例子只是java.util.concurrent
套装提供的一个袖口插图; 可能会对它进行一些优化(例如,QueueConsumer
因为它自己的类可能甚至不是必需的;你可以只提供ExecutorService
生产者提交任务的任何东西).
挖掘java.util.concurrent
包(从上面的一些链接开始).您可能会发现它为您尝试执行的操作提供了很多很好的选择,您甚至不必担心调整工作队列.
制作毒物对象的另一种可能性:使其成为该类的特定实例.通过这种方式,您不必捣乱子类型或搞砸通用.
缺点:如果生产者和消费者之间存在某种序列化障碍,这将无效.
public class ComplexObject
{
public static final POISON_INSTANCE = new ComplexObject();
public ComplexObject(whatever arguments) {
}
// Empty constructor for creating poison instance.
private ComplexObject() {
}
}
class QueueConsumer implements Runnable {
@Override
public void run() {
while(!(Thread.currentThread().interrupted())) {
try {
final ComplexObject complexObject = myBlockingQueue.take();
if (complexObject == ComplexObject.POISON_INSTANCE)
return;
// Process complex object.
} catch (InterruptedException e) {
// Set interrupted flag.
Thread.currentThread().interrupt();
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
17921 次 |
最近记录: |