sta*_*off 8 java concurrency multithreading notify livelock
当我偶然发现一些我不理解的行为时,我试图使用Java同步"原语"(synchronized,wait(),notify())来实现类似于Java的有界BlockingQueue接口.
我创建了一个能够存储1个元素的队列,创建两个等待从队列中获取值的线程,启动它们,然后尝试将两个值放入主线程中的同步块中的队列中.它大部分时间都可以工作,但有时候等待值的两个线程似乎开始相互唤醒并且不让主线程进入同步块.
这是我的(简化)代码:
import java.util.LinkedList;
import java.util.Queue;
public class LivelockDemo {
private static final int MANY_RUNS = 10000;
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < MANY_RUNS; i++) { // to increase the probability
final MyBoundedBlockingQueue ctr = new MyBoundedBlockingQueue(1);
Thread t1 = createObserver(ctr, i + ":1");
Thread t2 = createObserver(ctr, i + ":2");
t1.start();
t2.start();
System.out.println(i + ":0 ready to enter synchronized block");
synchronized (ctr) {
System.out.println(i + ":0 entered synchronized block");
ctr.addWhenHasSpace("hello");
ctr.addWhenHasSpace("world");
}
t1.join();
t2.join();
System.out.println();
}
}
public static class MyBoundedBlockingQueue {
private Queue<Object> lst = new LinkedList<Object>();;
private int limit;
private MyBoundedBlockingQueue(int limit) {
this.limit = limit;
}
public synchronized void addWhenHasSpace(Object obj) throws InterruptedException {
boolean printed = false;
while (lst.size() >= limit) {
printed = __heartbeat(':', printed);
notify();
wait();
}
lst.offer(obj);
notify();
}
// waits until something has been set and then returns it
public synchronized Object getWhenNotEmpty() throws InterruptedException {
boolean printed = false;
while (lst.isEmpty()) {
printed = __heartbeat('.', printed); // show progress
notify();
wait();
}
Object result = lst.poll();
notify();
return result;
}
// just to show progress of waiting threads in a reasonable manner
private static boolean __heartbeat(char c, boolean printed) {
long now = System.currentTimeMillis();
if (now % 1000 == 0) {
System.out.print(c);
printed = true;
} else if (printed) {
System.out.println();
printed = false;
}
return printed;
}
}
private static Thread createObserver(final MyBoundedBlockingQueue ctr,
final String name) {
return new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(name + ": saw " + ctr.getWhenNotEmpty());
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
}, name);
}
}
Run Code Online (Sandbox Code Playgroud)
这是我在"阻止"时看到的内容:
(skipped a lot)
85:0 ready to enter synchronized block
85:0 entered synchronized block
85:2: saw hello
85:1: saw world
86:0 ready to enter synchronized block
86:0 entered synchronized block
86:2: saw hello
86:1: saw world
87:0 ready to enter synchronized block
............................................
..........................................................................
..................................................................................
(goes "forever")
Run Code Online (Sandbox Code Playgroud)
但是,如果我将addWhenHasSpace和getWhenNotEmpty方法的while(...)循环中的notify()调用更改为notifyAll(),它将"始终"通过.
我的问题是:为什么在这种情况下,notify()和notifyAll()方法之间的行为有所不同,为什么notify()的行为是这样的呢?
我希望这两种方法在这种情况下的行为方式相同(两个线程WAITING,一个BLOCKED),因为:
或者也许我一共做错了什么?
使用内在锁定似乎存在某种公平/闯入 - 可能是由于某些优化。我猜测,本机代码会检查当前线程是否已通知它将要等待的监视器并允许其获胜。
替换synchronized为ReentrantLock,它应该按您的预期工作。这里的不同之处在于如何ReentrantLock处理已通知的锁的等待者。
更新:
有趣的发现在这里。main你所看到的是线程进入之间的竞争
synchronized (ctr) {
System.out.println(i + ":0 entered synchronized block");
ctr.addWhenHasSpace("hello");
ctr.addWhenHasSpace("world");
}
Run Code Online (Sandbox Code Playgroud)
而另外两个线程则进入各自的synchronized区域。如果主线程至少在两者之一之前没有进入其同步区域,您将遇到您所描述的这种活锁输出。
notify似乎正在发生的情况是,如果两个消费者线程都首先命中同步块,它们将在和 之间相互进行乒乓球运动wait。可能会出现这样的情况:当线程被阻塞时,JVM 会将正在等待的线程优先级赋予监视器。
| 归档时间: |
|
| 查看次数: |
462 次 |
| 最近记录: |