thk*_*ala 7 java multithreading
在我的一个Java 6应用程序中,我有一个线程,它为主线程提供数据,同时还从数据库中预取更多记录.它使用ArrayBlockingQueue队列作为FIFO缓冲区,其主循环是这些行:
while (!Thread.interrupted()) {
if (source.hasNext()) {
try {
queue.put(source.next())
} catch (InterruptedException e) {
break;
}
} else {
break;
}
}
Run Code Online (Sandbox Code Playgroud)
在循环终止后,有一些代码可以进行一些清理,例如中毒队列和释放任何资源,但这几乎就是它的全部内容.
目前,没有从主线程到馈线线程的直接通信:馈线线程使用适当的选项进行设置,然后单独使用阻塞队列来控制数据流.
当主线程在队列已满时需要关闭进纸器时,会出现问题.由于没有直接控制通道,因此shutdown方法使用Thread接口连接到interrupt()馈线线程.不幸的是,在大多数情况下put(),尽管被中断,但是馈线线程仍然被阻挡- 没有异常被抛出.
从interrupt()文档和队列实现源代码的简要细读,在我看来,经常put()阻止不使用JVM的任何可中断设施.更具体地说,在我当前的JVM(OpenJDK 1.6b22)上,它会阻塞本sun.misc.Unsafe.park()机方法.也许它使用螺旋锁或其他东西,但无论如何,这似乎属于以下情况:
如果以前的条件都不成立,则将设置该线程的中断状态.
设置了状态标志,但线程仍然被阻塞,put()并且不再进行迭代,以便可以检查该标志.结果?一个不会死的僵尸线程!
我对这个问题的理解是正确的,还是我错过了什么?
解决此问题的可能方法有哪些?现在我只能想到两个解决方案:
一个.poll()在队列上调用了很多次来取消阻塞馈线线程:从我看到的内容看起来很丑陋而且不太可靠,但它大部分都有效.
湾 使用offer()超时方法而不是put()允许线程在可接受的时间范围内检查其中断状态.
除非我遗漏了某些内容,否则这是对Java中BlockingQueue实现的一个有点未记录的警告.有似乎是它的一些迹象表明,当文档,如建议中毒队列关闭工作线程,但我找不到任何明确的参考.
编辑:
好的,上面的解决方案(a)有更多,呃,剧烈的变化:ArrayBlockingQueue.clear().我认为这应该始终有效,即使它不完全是优雅的定义......
我认为你的问题有两个可能的原因.
如破坏门铃法则所述,您可能无法正确处理中断.在那里你会发现:
当我们调用可能导致InterruptedException的代码时,我们应该怎么做?不要马上掏出电池!通常,该问题有两个答案:
从您的方法中重新抛出InterruptedException.这通常是最简单和最好的方法.它由新的java.util.concurrent.*包使用,它解释了为什么我们现在不断接触这个异常.
抓住它,设置中断状态,返回.如果您在调用可能导致异常的代码的循环中运行,则应将状态设置回中断状态.
例如:Run Code Online (Sandbox Code Playgroud)while (!Thread.currentThread().isInterrupted()) { // do something try { TimeUnit.SECONDS.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } }
无论是source.hasNext()或source.next()正在消耗并丢弃中断状态.请参阅下面添加了我如何解决此问题.
我相信打断一个线程ArrayBlockingqueue.put() 是一个有效的解决方案.
添加
我用一个CloseableBlockingQueue可以从阅读器端关闭的问题解决了问题2 .通过这种方式,一旦关闭,所有put呼叫都将快捷方式.然后,您可以从编写器检查closed队列的标志.
// A blocking queue I can close from the pull end.
// Please only use put because offer does not shortcut on close.
// <editor-fold defaultstate="collapsed" desc="// Exactly what it says on the tin.">
class CloseableBlockingQueue<E> extends ArrayBlockingQueue<E> {
// Flag indicates closed state.
private volatile boolean closed = false;
// All blocked threads. Actually this is all threads that are in the process
// of invoking a put but if put doesn't block then they disappear pretty fast.
// NB: Container is O(1) for get and almost O(1) (depending on how busy it is) for put.
private final Container<Thread> blocked;
// Limited size.
public CloseableBlockingQueue(int queueLength) {
super(queueLength);
blocked = new Container<Thread>(queueLength);
}
/**
* *
* Shortcut to do nothing if closed.
*
* Track blocked threads.
*/
@Override
public void put(E e) throws InterruptedException {
if (!closed) {
Thread t = Thread.currentThread();
// Hold my node on the stack so removal can be trivial.
Container.Node<Thread> n = blocked.add(t);
try {
super.put(e);
} finally {
// Not blocked anymore.
blocked.remove(n, t);
}
}
}
/**
*
* Shortcut to do nothing if closed.
*/
@Override
public E poll() {
E it = null;
// Do nothing when closed.
if (!closed) {
it = super.poll();
}
return it;
}
/**
*
* Shortcut to do nothing if closed.
*/
@Override
public E poll(long l, TimeUnit tu) throws InterruptedException {
E it = null;
// Do nothing when closed.
if (!closed) {
it = super.poll(l, tu);
}
return it;
}
/**
*
* isClosed
*/
boolean isClosed() {
return closed;
}
/**
*
* Close down everything.
*/
void close() {
// Stop all new queue entries.
closed = true;
// Must unblock all blocked threads.
// Walk all blocked threads and interrupt them.
for (Thread t : blocked) {
//log("! Interrupting " + t.toString());
// Interrupt all of them.
t.interrupt();
}
}
@Override
public String toString() {
return blocked.toString();
}
}
Run Code Online (Sandbox Code Playgroud)
您还需要无Container锁和O(1)put/get(虽然它不是严格的集合).它使用Ring幕后.
public class Container<T> implements Iterable<T> {
// The capacity of the container.
final int capacity;
// The list.
AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();
// Constructor
public Container(int capacity) {
this.capacity = capacity;
// Construct the list.
Node<T> h = new Node<T>();
Node<T> it = h;
// One created, now add (capacity - 1) more
for (int i = 0; i < capacity - 1; i++) {
// Add it.
it.next = new Node<T>();
// Step on to it.
it = it.next;
}
// Make it a ring.
it.next = h;
// Install it.
head.set(h);
}
// Empty ... NOT thread safe.
public void clear() {
Node<T> it = head.get();
for (int i = 0; i < capacity; i++) {
// Trash the element
it.element = null;
// Mark it free.
it.free.set(true);
it = it.next;
}
// Clear stats.
resetStats();
}
// Add a new one.
public Node<T> add(T element) {
// Get a free node and attach the element.
return getFree().attach(element);
}
// Find the next free element and mark it not free.
private Node<T> getFree() {
Node<T> freeNode = head.get();
int skipped = 0;
// Stop when we hit the end of the list
// ... or we successfully transit a node from free to not-free.
while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
skipped += 1;
freeNode = freeNode.next;
}
if (skipped < capacity) {
// Put the head as next.
// Doesn't matter if it fails. That would just mean someone else was doing the same.
head.set(freeNode.next);
} else {
// We hit the end! No more free nodes.
throw new IllegalStateException("Capacity exhausted.");
}
return freeNode;
}
// Mark it free.
public void remove(Node<T> it, T element) {
// Remove the element first.
it.detach(element);
// Mark it as free.
if (!it.free.compareAndSet(false, true)) {
throw new IllegalStateException("Freeing a freed node.");
}
}
// The Node class. It is static so needs the <T> repeated.
public static class Node<T> {
// The element in the node.
private T element;
// Are we free?
private AtomicBoolean free = new AtomicBoolean(true);
// The next reference in whatever list I am in.
private Node<T> next;
// Construct a node of the list
private Node() {
// Start empty.
element = null;
}
// Attach the element.
public Node<T> attach(T element) {
// Sanity check.
if (this.element == null) {
this.element = element;
} else {
throw new IllegalArgumentException("There is already an element attached.");
}
// Useful for chaining.
return this;
}
// Detach the element.
public Node<T> detach(T element) {
// Sanity check.
if (this.element == element) {
this.element = null;
} else {
throw new IllegalArgumentException("Removal of wrong element.");
}
// Useful for chaining.
return this;
}
@Override
public String toString() {
return element != null ? element.toString() : "null";
}
}
// Provides an iterator across all items in the container.
public Iterator<T> iterator() {
return new UsedNodesIterator<T>(this);
}
// Iterates across used nodes.
private static class UsedNodesIterator<T> implements Iterator<T> {
// Where next to look for the next used node.
Node<T> it;
int limit = 0;
T next = null;
public UsedNodesIterator(Container<T> c) {
// Snapshot the head node at this time.
it = c.head.get();
limit = c.capacity;
}
public boolean hasNext() {
if (next == null) {
// Scan to the next non-free node.
while (limit > 0 && it.free.get() == true) {
it = it.next;
// Step down 1.
limit -= 1;
}
if (limit != 0) {
next = it.element;
}
}
return next != null;
}
public T next() {
T n = null;
if ( hasNext () ) {
// Give it to them.
n = next;
next = null;
// Step forward.
it = it.next;
limit -= 1;
} else {
// Not there!!
throw new NoSuchElementException ();
}
return n;
}
public void remove() {
throw new UnsupportedOperationException("Not supported.");
}
}
@Override
public String toString() {
StringBuilder s = new StringBuilder();
Separator comma = new Separator(",");
// Keep counts too.
int usedCount = 0;
int freeCount = 0;
// I will iterate the list myself as I want to count free nodes too.
Node<T> it = head.get();
int count = 0;
s.append("[");
// Scan to the end.
while (count < capacity) {
// Is it in-use?
if (it.free.get() == false) {
// Grab its element.
T e = it.element;
// Is it null?
if (e != null) {
// Good element.
s.append(comma.sep()).append(e.toString());
// Count them.
usedCount += 1;
} else {
// Probably became free while I was traversing.
// Because the element is detached before the entry is marked free.
freeCount += 1;
}
} else {
// Free one.
freeCount += 1;
}
// Next
it = it.next;
count += 1;
}
// Decorate with counts "]used+free".
s.append("]").append(usedCount).append("+").append(freeCount);
if (usedCount + freeCount != capacity) {
// Perhaps something was added/freed while we were iterating.
s.append("?");
}
return s.toString();
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3020 次 |
| 最近记录: |