使用队列的生产者/消费者线程

Gar*_*eth 56 java queue multithreading producer-consumer

我想创建一些Producer/Consumer线程应用程序.但我不确定在两者之间实现队列的最佳方法是什么.

所以我有两个想法(两者都可能是完全错误的).我想知道哪个更好,如果它们都吮吸那么什么是实现队列的最佳方式.这主要是我在这些例子中实现的队列,我很关心.我正在扩展一个内部类的Queue类,并且是线程安全的.以下是两个示例,每个示例包含4个类.

主类 -

public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 
Run Code Online (Sandbox Code Playgroud)

消费者类 -

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

制片人类 -

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

队列类 -

public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}
Run Code Online (Sandbox Code Playgroud)

要么

主类 -

public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
} 
Run Code Online (Sandbox Code Playgroud)

消费者类 -

public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

制片人类 -

public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

队列类 -

//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{    
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}
Run Code Online (Sandbox Code Playgroud)

去!

cle*_*tus 76

Java 5+拥有此类所需的所有工具.你会想:

  1. 将所有制作人放在一起ExecutorService;
  2. 将所有消费者放在另一个消费者中ExecutorService;
  3. 如有必要,使用a进行两者之间的通信BlockingQueue.

我说"如果有必要"(3),因为根据我的经验,这是一个不必要的步骤.您所做的就是向消费者执行者服务提交新任务.所以:

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
Run Code Online (Sandbox Code Playgroud)

所以producers直接提交到consumers.

  • Cletus是有钱获取更多信息,以帮助澄清"从哪里开始"http://java.sun.com/docs/books/tutorial/essential/concurrency/ (2认同)
  • 值得注意的是,如果“消费者”进程可能以需要重新处理数据的方式失败,则值得引入一个BlockingQueue,以便处于错误状态的消费者可以将数据扔回队列中供其他消费者使用重新处理。 (2认同)

Enn*_*oji 17

好吧,正如其他人所说,最好的办法是使用java.util.concurrent包.我强烈推荐"Java Concurrency in Practice".这本书很棒,几乎涵盖了你需要知道的一切.

至于你的特定实现,正如我在评论中提到的,不要从构造函数启动线程 - 它可能是不安全的.

抛开这一点,第二个实现似乎更好.您不希望将队列放在静态字段中.你可能只是失去了灵活性.

如果你想继续你自己的实现(为了学习目的,我猜?),start()至少提供一个方法.您应该构造对象(您可以实例化Thread对象),然后调用start()以启动该线程.

编辑:ExecutorService有自己的队列,所以这可能会令人困惑..这是让你开始的东西.

public class Main {
    public static void main(String[] args) {
        //The numbers are just silly tune parameters. Refer to the API.
        //The important thing is, we are passing a bounded queue.
        ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));

        //No need to bound the queue for this executor.
        //Use utility method instead of the complicated Constructor.
        ExecutorService producer = Executors.newSingleThreadExecutor();

        Runnable produce = new Produce(consumer);
        producer.submit(produce);   
    }
}

class Produce implements Runnable {
    private final ExecutorService consumer;

    public Produce(ExecutorService consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        Pancake cake = Pan.cook();
        Runnable consume = new Consume(cake);
        consumer.submit(consume);
    }
}

class Consume implements Runnable {
    private final Pancake cake;

    public Consume(Pancake cake){
        this.cake = cake;
    }

    @Override
    public void run() {
        cake.eat();
    }
}
Run Code Online (Sandbox Code Playgroud)

进一步编辑:对于制作人而言,while(true)您可以做以下事情:

@Override
public void run(){
    while(!Thread.currentThread().isInterrupted()){
        //do stuff
    }
}
Run Code Online (Sandbox Code Playgroud)

这样您就可以通过调用来关闭执行程序.shutdownNow().如果您使用while(true),它将不会关闭.

还要注意,Producer仍然容易受到攻击RuntimeExceptions(即一个人RuntimeException会停止处理)


fly*_*ire 8

你正在重新发明轮子.

如果你需要持久性和其他企业功能使用JMS(我建议使用ActiveMq).

如果您需要快速的内存中队列,请使用java的Queue的一种强制性.

如果您需要支持java 1.4或更早版本,请使用Doug Lea优秀的发包.

  • 在面试时你仍然可以被要求实施Producer Consumer :) (5认同)

Rav*_*abu 8

我已经扩展了cletus提出的工作代码示例的答案.

  1. 一个ExecutorService(pes)接受Producer任务.
  2. 一个ExecutorService(ces)接受Consumer任务.
  3. 无论ProducerConsumer股票BlockingQueue.
  4. 多个Producer任务生成不同的数字.
  5. 任何Consumer任务都可以消耗由此生成的数字Producer

码:

import java.util.concurrent.*;

public class ProducerConsumerWithES {
    public static void main(String args[]){
         BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

         ExecutorService pes = Executors.newFixedThreadPool(2);
         ExecutorService ces = Executors.newFixedThreadPool(2);

         pes.submit(new Producer(sharedQueue,1));
         pes.submit(new Producer(sharedQueue,2));
         ces.submit(new Consumer(sharedQueue,1));
         ces.submit(new Consumer(sharedQueue,2));
         // shutdown should happen somewhere along with awaitTermination
         / * https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
         pes.shutdown();
         ces.shutdown();
    }
}
class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable{
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}
Run Code Online (Sandbox Code Playgroud)

输出:

Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Consumed: 11:by thread:1
Produced:12:by thread:1
Consumed: 22:by thread:1
Consumed: 21:by thread:2
Produced:23:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Consumed: 23:by thread:2
Produced:24:by thread:2
Consumed: 13:by thread:1
Produced:14:by thread:1
Consumed: 24:by thread:2
Produced:25:by thread:2
Consumed: 14:by thread:1
Produced:15:by thread:1
Consumed: 25:by thread:2
Consumed: 15:by thread:1
Run Code Online (Sandbox Code Playgroud)

注意.如果您不需要多个生产者和消费者,请保留单个生产者和消费者.我添加了多个生产者和消费者,以在多个生产者和消费者之间展示BlockingQueue的功能.