使用ConcurrentLinkedQueue的Java线程问题

mb3*_*900 1 java concurrency multithreading producer-consumer

我有以下代码片段的问题.它旨在处理添加到事件队列(ConcurrentLinkedQueue)的事件(通过对processEvent方法的调用提供).事件被添加到事件队列中并在run方法中定期处理.

一切都很好.但有时在调用processEvent方法之后,当一个事件被添加到队列时,运行部件无法看到有新事件.

什么是错的?除了使用String常量作为锁定的明显错误之外?

import java.util.concurrent.ConcurrentLinkedQueue;

public class MyCommunicator implements Runnable {

private ConcurrentLinkedQueue<MyEvent> eventQueue = null;

private boolean stopped = false;

private String lock = "";
private Thread thread = null;

public MyCommunicator() {

    eventQueue = new ConcurrentLinkedQueue<MyEvent>();
}

public void start() {
    thread = new Thread(this, "MyCommunicatorThread");
    thread.start();
}

public void stop() {
    stopped = true;
    synchronized (lock) {
        lock.notifyAll();
    }
    eventQueue.clear();
}

public void run() {
    while (!stopped) {
        try {

            MyEvent event = null;
            while (!stopped && ((event = eventQueue.peek()) != null)) {
                sendEvent(event);
                eventQueue.poll();
            }

            if (!stopped) {
                synchronized (lock) {
                    lock.wait(10000L);
                }
            }
        }

        catch (Exception e) {

        }
    }
}

/**
 * START EVENT JOB - ADD A NEW EVENT TO BE PROCESSED
 */
public void processEvent(MyEvent event) {
    eventQueue.offer(event);
    synchronized (lock) {
        lock.notifyAll();
    }
}

/**
 * END EVENT JOB
 */
private void sendEvent(MyEvent event) {
    // do send event job
}

}
Run Code Online (Sandbox Code Playgroud)

Bri*_*ach 11

你为什么使用锁和通知?

使用LinkedBlockingQueue代替并省去所有麻烦.

那个超时的poll()意志将完成你想要做的一切.


编辑:关于当前代码;

您需要定义"未能看到有新事件".您的run()方法每10秒查看一次队列; 如果队列中有什么东西,它会"看到它"并将其拉出来.

  • 如果你的意思是"它只是在通知后立即看到它,只有10秒钟",那么这很容易回答,因为你有一个很容易导致这种情况发生的竞争条件.当该线程在完成检查/处理队列和获取锁之间时,可以将某些东西插入到队列中.如果没有超时,wait()您将阻止直到插入下一个事件.如果该stop()方法在此期间正在调用,则会丢失队列中的任何事件.使用LinkedBlockingQueue而不是所有不必要的锁定和通知解决了这个问题.这不是一个"简单"的解决方案,它是这个用例和问题的正确解决方案.

  • 如果情况并非如此,那么您只是不在队列中插入任何内容,而问题在于您未在此处发布的代码.不知道有关代码的任何一个猜测是,你试图插入一个空MyEventeventQueue.offer(event).既然你没有尝试/赶上offer()你就不会知道它.忽略所有异常而不检查返回值既不是一个好主意或实践.

  • 第三种可能性是你有一些其他代码锁定在相同的内部字符串文字引用上会导致此代码挂起.你提到它但我会在这里重申 - 这是一件非常糟糕的事情,特别是考虑到它是空字符串.该java.util.concurrent软件包提供真正的条件,如果你坚持在这里使用它们.请注意,这仍然无法消除您在有时错过10秒事件时所遇到的竞争条件,但它至少会更清晰.为了消除你的竞争条件,你需要抛弃并发队列以获得常规队列,并在访问它之前获取锁(以及获取插入锁).这将同步您的线程,因为插入器将被阻止插入,除非此线程正在等待锁定条件.在同一块代码中混合使用锁和无锁的线程同步方法通常会导致这些问题.


Jed*_*ith 5

你有一个所谓的错过信号.您轮询队列,然后在监视器上等待(获取锁定).生产者线程添加事件然后调用notifyAll()(获取锁定).happens-before事件排队/轮询与条件等待/通知之间没有关系.

因此,线程A可以在空时轮询,然后尝试获取锁,同时线程B添加一个元素并获取锁,通知所有等待线程然后释放锁.线程A然后获取锁并等待它,但信号已被遗漏.

当您使用锁仅用于信令时,您可能会考虑另一种机制,例如像Doug Lea的新jdk7 Phaser这样的可重复使用的锁存,或者只是BlockingQueue直接使用.

另外,我们有几个ReusableLatchBooleanLatch单个读取器线程或PhasedLatch多方支持.