java并发:多生产者一个消费者

Ran*_*ize 5 java concurrency consumer producer

我有一种情况,其中不同的线程填充队列(生产者)和一个消费者从该队列中检索元素.我的问题是,当从队列中检索到其中一个元素时,会遗漏一些(丢失信号?).生产者代码是:

class Producer implements Runnable {

    private Consumer consumer;

    Producer(Consumer consumer) { this.consumer = consumer; }

    @Override
public void run() {
    consumer.send("message");
  }
}
Run Code Online (Sandbox Code Playgroud)

它们是通过以下方式创建和运行的:

ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
  executor.execute(new Producer(consumer));
}
Run Code Online (Sandbox Code Playgroud)

消费者代码是:

class Consumer implements Runnable {

private Queue<String> queue = new ConcurrentLinkedQueue<String>();

void send(String message) {
    synchronized (queue) {
        queue.add(message);
        System.out.println("SIZE: " + queue.size());
        queue.notify();
    }
}

@Override
public void run() {
    int counter = 0;
    synchronized (queue) {
    while(true) {
        try {
            System.out.println("SLEEP");
                queue.wait(10);
        } catch (InterruptedException e) {
                Thread.interrupted();
        }
        System.out.println(counter);
        if (!queue.isEmpty()) {             
            queue.poll();
            counter++;
        }
    }
    }
}

}
Run Code Online (Sandbox Code Playgroud)

当代码运行时,我有时会添加20个元素并检索20个元素,但在其他情况下,检索到的元素少于20.任何想法如何解决?

cha*_*sis 10

我建议您使用BlockingQueue而不是Queue.LinkedBlockingDeque可能是一个很好的候选人.

您的代码如下所示:

void send(String message) {
    synchronized (queue) {
        queue.put(message);
        System.out.println("SIZE: " + queue.size());
    }
}
Run Code Online (Sandbox Code Playgroud)

然后你需要

queue.take()
Run Code Online (Sandbox Code Playgroud)

在您的消费者线程上

这个想法是.take()将阻塞,直到一个项目在队列中可用,然后返回一个项目(这是我认为你的实现受到影响的地方:轮询时缺少通知)..put()负责为您执行所有通知.无需等待/通知.

  • 为什么要围绕“queue.put”进行同步? (2认同)