如何中断在take()上阻塞的BlockingQueue?

MCS*_*MCS 81 java concurrency interrupt blocking

我有一个从a获取对象的类,BlockingQueue并通过take()在连续循环中调用来处理它们.在某些时候,我知道不会有更多的对象被添加到队列中.如何中断该take()方法以阻止其阻塞?

这是处理对象的类:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

这是使用此类处理对象的方法:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}
Run Code Online (Sandbox Code Playgroud)

Chr*_*ill 66

如果中断线程不是一个选项,另一个是在队列上放置一个"标记"或"命令"对象,MyObjHandler会识别这个对象,并打破循环.

  • 这也被称为"Poison Pill Shutdown"方法,并在"Java Concurrency in Practice"中详细讨论,特别是第155-156页. (35认同)

eri*_*son 13

BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();
Run Code Online (Sandbox Code Playgroud)

但是,如果执行此操作,则队列中仍有项目等待处理时,线程可能会中断.您可能需要考虑使用poll而不是take,这将允许处理线程超时并在等待一段时间没有新输入时终止.

  • @MCS - 请注意未来访问者您的方法是黑客攻击,不应在生产代码中复制,原因有三个.总是首选找到挂钩关闭的实际方法.使用`Thread.sleep()`作为正确钩子的替代方法是不可接受的.在其他实现中,其他线程可能将事物放入队列中,而while循环可能永远不会结束. (2认同)

dbw*_*dbw 13

很晚但希望这也有助于其他我面对类似的问题并使用上面pollerickson提出的方法进行一些小的改动,

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}
Run Code Online (Sandbox Code Playgroud)

这解决了这两个问题

  1. 标记了BlockingQueue它,以便它知道它不必等待元素
  2. 没有在两者之间中断,因此仅当处理队列中的所有项目并且没有剩余项目要添加时,处理块才终止

  • 使`Finished`变量`volatile`以保证线程之间的可见性.请参见http://stackoverflow.com/a/106787 (2认同)
  • 如果我没有弄错,队列中的最后一个元素将不会被处理.从队列中取出最后一个元素时,finished为true且队列为空,因此在处理完最终元素之前它将返回.添加第三个条件if(Finished && queue.isEmpty()&& obj == null) (2认同)