Gar*_*eth 56 java queue multithreading producer-consumer
我想创建一些Producer/Consumer线程应用程序.但我不确定在两者之间实现队列的最佳方法是什么.
所以我有两个想法(两者都可能是完全错误的).我想知道哪个更好,如果它们都吮吸那么什么是实现队列的最佳方式.这主要是我在这些例子中实现的队列,我很关心.我正在扩展一个内部类的Queue类,并且是线程安全的.以下是两个示例,每个示例包含4个类.
主类 -
public class SomeApp
{
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
consumer = new Consumer();
producer = new Producer();
}
}
Run Code Online (Sandbox Code Playgroud)
消费者类 -
public class Consumer implements Runnable
{
public Consumer()
{
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = QueueHandler.dequeue();
//do some stuff with the object
}
}
}
Run Code Online (Sandbox Code Playgroud)
制片人类 -
public class Producer implements Runnable
{
public Producer()
{
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
QueueHandler.enqueue(new Object());
}
}
}
Run Code Online (Sandbox Code Playgroud)
队列类 -
public class QueueHandler
{
//This Queue class is a thread safe (written in house) class
public static Queue<Object> readQ = new Queue<Object>(100);
public static void enqueue(Object object)
{
//do some stuff
readQ.add(object);
}
public static Object dequeue()
{
//do some stuff
return readQ.get();
}
}
Run Code Online (Sandbox Code Playgroud)
要么
主类 -
public class SomeApp
{
Queue<Object> readQ;
private Consumer consumer;
private Producer producer;
public static void main (String args[])
{
readQ = new Queue<Object>(100);
consumer = new Consumer(readQ);
producer = new Producer(readQ);
}
}
Run Code Online (Sandbox Code Playgroud)
消费者类 -
public class Consumer implements Runnable
{
Queue<Object> queue;
public Consumer(Queue<Object> readQ)
{
queue = readQ;
Thread consumer = new Thread(this);
consumer.start();
}
public void run()
{
while(true)
{
//get an object off the queue
Object object = queue.dequeue();
//do some stuff with the object
}
}
}
Run Code Online (Sandbox Code Playgroud)
制片人类 -
public class Producer implements Runnable
{
Queue<Object> queue;
public Producer(Queue<Object> readQ)
{
queue = readQ;
Thread producer = new Thread(this);
producer.start();
}
public void run()
{
while(true)
{
//add to the queue some sort of unique object
queue.enqueue(new Object());
}
}
}
Run Code Online (Sandbox Code Playgroud)
队列类 -
//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{
public QueueHandler(int size)
{
super(size); //All I'm thinking about now is McDonalds.
}
public void enqueue(Object object)
{
//do some stuff
readQ.add();
}
public Object dequeue()
{
//do some stuff
return readQ.get();
}
}
Run Code Online (Sandbox Code Playgroud)
去!
cle*_*tus 76
Java 5+拥有此类所需的所有工具.你会想:
ExecutorService;ExecutorService;BlockingQueue.我说"如果有必要"(3),因为根据我的经验,这是一个不必要的步骤.您所做的就是向消费者执行者服务提交新任务.所以:
final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
Run Code Online (Sandbox Code Playgroud)
所以producers直接提交到consumers.
Enn*_*oji 17
好吧,正如其他人所说,最好的办法是使用java.util.concurrent包.我强烈推荐"Java Concurrency in Practice".这本书很棒,几乎涵盖了你需要知道的一切.
至于你的特定实现,正如我在评论中提到的,不要从构造函数启动线程 - 它可能是不安全的.
抛开这一点,第二个实现似乎更好.您不希望将队列放在静态字段中.你可能只是失去了灵活性.
如果你想继续你自己的实现(为了学习目的,我猜?),start()至少提供一个方法.您应该构造对象(您可以实例化Thread对象),然后调用start()以启动该线程.
编辑:ExecutorService有自己的队列,所以这可能会令人困惑..这是让你开始的东西.
public class Main {
public static void main(String[] args) {
//The numbers are just silly tune parameters. Refer to the API.
//The important thing is, we are passing a bounded queue.
ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));
//No need to bound the queue for this executor.
//Use utility method instead of the complicated Constructor.
ExecutorService producer = Executors.newSingleThreadExecutor();
Runnable produce = new Produce(consumer);
producer.submit(produce);
}
}
class Produce implements Runnable {
private final ExecutorService consumer;
public Produce(ExecutorService consumer) {
this.consumer = consumer;
}
@Override
public void run() {
Pancake cake = Pan.cook();
Runnable consume = new Consume(cake);
consumer.submit(consume);
}
}
class Consume implements Runnable {
private final Pancake cake;
public Consume(Pancake cake){
this.cake = cake;
}
@Override
public void run() {
cake.eat();
}
}
Run Code Online (Sandbox Code Playgroud)
进一步编辑:对于制作人而言,while(true)您可以做以下事情:
@Override
public void run(){
while(!Thread.currentThread().isInterrupted()){
//do stuff
}
}
Run Code Online (Sandbox Code Playgroud)
这样您就可以通过调用来关闭执行程序.shutdownNow().如果您使用while(true),它将不会关闭.
还要注意,Producer仍然容易受到攻击RuntimeExceptions(即一个人RuntimeException会停止处理)
我已经扩展了cletus提出的工作代码示例的答案.
ExecutorService(pes)接受Producer任务.ExecutorService(ces)接受Consumer任务.Producer和Consumer股票BlockingQueue.Producer任务生成不同的数字.Consumer任务都可以消耗由此生成的数字Producer码:
import java.util.concurrent.*;
public class ProducerConsumerWithES {
public static void main(String args[]){
BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
ExecutorService pes = Executors.newFixedThreadPool(2);
ExecutorService ces = Executors.newFixedThreadPool(2);
pes.submit(new Producer(sharedQueue,1));
pes.submit(new Producer(sharedQueue,2));
ces.submit(new Consumer(sharedQueue,1));
ces.submit(new Consumer(sharedQueue,2));
// shutdown should happen somewhere along with awaitTermination
/ * https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */
pes.shutdown();
ces.shutdown();
}
}
class Producer implements Runnable {
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
this.threadNo = threadNo;
this.sharedQueue = sharedQueue;
}
@Override
public void run() {
for(int i=1; i<= 5; i++){
try {
int number = i+(10*threadNo);
System.out.println("Produced:" + number + ":by thread:"+ threadNo);
sharedQueue.put(number);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
class Consumer implements Runnable{
private final BlockingQueue<Integer> sharedQueue;
private int threadNo;
public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
this.sharedQueue = sharedQueue;
this.threadNo = threadNo;
}
@Override
public void run() {
while(true){
try {
int num = sharedQueue.take();
System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
} catch (Exception err) {
err.printStackTrace();
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
输出:
Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Consumed: 11:by thread:1
Produced:12:by thread:1
Consumed: 22:by thread:1
Consumed: 21:by thread:2
Produced:23:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Consumed: 23:by thread:2
Produced:24:by thread:2
Consumed: 13:by thread:1
Produced:14:by thread:1
Consumed: 24:by thread:2
Produced:25:by thread:2
Consumed: 14:by thread:1
Produced:15:by thread:1
Consumed: 25:by thread:2
Consumed: 15:by thread:1
Run Code Online (Sandbox Code Playgroud)
注意.如果您不需要多个生产者和消费者,请保留单个生产者和消费者.我添加了多个生产者和消费者,以在多个生产者和消费者之间展示BlockingQueue的功能.