制片人/消费者多线程

Rob*_*ino 17 java multithreading

背景

缺少学校的钱,我正在收费的夜班工作,并使用互联网教自己一些编码技能,希望明天更好的工作或我做的一些应用程序的在线销售.漫长的夜晚,很少有客户.

我正在处理多线程作为一个主题,因为我遇到了很多使用它的文献中的代码(例如Android SDK),但我仍然觉得它很模糊.

精神

我在这一点上的方法是:尝试编写我能想到的最基本的多线程示例代码,将我的头撞到墙上一点点,然后看看我是否可以伸展我的大脑以适应一些新颖的思维方式.我正在暴露自己的极限,希望超越它们.随意批评,挑剔,并指出做我想做的事情的更好方法.

目的

  • Get some advice on how to do the above, based on my efforts so far (code provided)

练习

这是我定义的范围:

定义

创建两个类,它们在数据对象的生成和消耗方面协同工作.One Thread创建对象并将它们传递到共享空间以供另一个接收和使用.让我们调用生产线程Producer,消费线程Consumer和共享空间SharedSpace.产生对象以供另一方消费的行为可以通过类比这种情况来同化:

`Producer`    (a busy mum making chocolate-covered cakes for his child, up to a limit)
`Consumer`    (a hungry child waiting to eat all cakes the mum makes, until told to stop)
`SharedSpace` (a kitchen table on which the cakes are put as soon as they become ready)
`dataValue`   (a chocolate-dripping cake which MUST be eaten immediately or else...)
Run Code Online (Sandbox Code Playgroud)

为了简化运动,我决定以允许妈妈被烹饪的孩子吃了蛋糕.她将等待孩子完成他的蛋糕并立即制作另一个,达到一定的限度,以获得良好的养育.这个练习的本质是练习Thread信号,而不是实现任何并发.相反,我专注于完美的序列化,没有民意调查或"我可以去吗?" 检查.我想我将不得不编写后续练习,其中母亲和孩子接下来并行"工作".

途径

  • 让我的类实现Runnable接口,以便它们拥有自己的代码入口点

  • 使用我的类作为Thread对象的构造函数参数,这些对象是从程序的main入口点 实例化并启动的

  • 确保main程序不会在Thread之前通过Thread.join()终止

  • 设置限制为Producer将为其创建数据的次数Consumer

  • 同意将用于表示数据生成结束的标记Produce

  • 记录共享资源和数据生成/消耗事件上的锁的获取,包括最终签署工作线程

  • SharedSpace从程序中创建一个对象,main并在开始之前将其传递给每个工作者

  • privateSharedSpace内部为每个worker 存储对象的引用

  • Consumer在生成任何数据之前,提供防范和消息来描述准备好消费的状况

  • Producer在给定次数的迭代之后停止

  • Consumer在读取sentinel值后停止


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class Consumer extends Threaded {
  public Consumer(SharedSpace sharedSpace) {
    super(sharedSpace);
  }
  @Override
  public void run() {
    super.run();
    int consumedData = 0;
    while (consumedData != -1) {
      synchronized (sharedSpace) {
        logger.info("Acquired lock on sharedSpace.");
        consumedData = sharedSpace.dataValue;
        if (consumedData == 0) {
          try {
            logger.info("Data production has not started yet. "
                + "Releasing lock on sharedSpace, "
                + "until notification that it has begun.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        } else if (consumedData == -1) {
          logger.info("Consumed: END (end of data production token).");
        } else {
          logger.info("Consumed: {}.", consumedData);
          logger.info("Waking up producer to continue data production.");
          sharedSpace.notify();
          try {
            logger.info("Releasing lock on sharedSpace "
                + "until notified of new data availability.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        }
      }
    }
    logger.info("Signing off.");
  }
}
class Producer extends Threaded {
  private static final int N_ITERATIONS = 10;
  public Producer(SharedSpace sharedSpace) {
    super(sharedSpace);
  }
  @Override
  public void run() {
    super.run();
    int nIterations = 0;
    while (nIterations <= N_ITERATIONS) {
      synchronized (sharedSpace) {
        logger.info("Acquired lock on sharedSpace.");
        nIterations++;
        if (nIterations <= N_ITERATIONS) {
          sharedSpace.dataValue = nIterations;
          logger.info("Produced: {}", nIterations);
        } else {
          sharedSpace.dataValue = -1;
          logger.info("Produced: END (end of data production token).");
        }
        logger.info("Waking up consumer for data consumption.");
        sharedSpace.notify();
        if (nIterations <= N_ITERATIONS) {
          try {
            logger.info("Releasing lock on sharedSpace until notified.");
            sharedSpace.wait();
          } catch (InterruptedException interruptedException) {
            logger.error(interruptedException.getStackTrace().toString());
          }
        }
      }
    }
    logger.info("Signing off.");
  }
}
class SharedSpace {
  volatile int dataValue = 0;
}
abstract class Threaded implements Runnable {
  protected Logger logger;
  protected SharedSpace sharedSpace;
  public Threaded(SharedSpace sharedSpace) {
    this.sharedSpace = sharedSpace;
    logger = LoggerFactory.getLogger(this.getClass());
  }
  @Override
  public void run() {
    logger.info("Started.");
    String workerName = getClass().getName();
    Thread.currentThread().setName(workerName);
  }
}
public class ProducerConsumer {
  public static void main(String[] args) {
    SharedSpace sharedSpace = new SharedSpace();
    Thread producer = new Thread(new Producer(sharedSpace), "Producer");
    Thread consumer = new Thread(new Consumer(sharedSpace), "Consumer");
    producer.start();
    consumer.start();
    try {
      producer.join();
      consumer.join();
    } catch (InterruptedException interruptedException) {
      interruptedException.printStackTrace();
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

执行日志


Consumer - Started.
Consumer - Acquired lock on sharedSpace.
Consumer - Data production has not started yet. Releasing lock on sharedSpace, until notification that it has begun.
Producer - Started.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 1
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 1.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 2
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 2.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 3
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 3.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 4
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 4.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 5
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 5.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 6
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 6.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 7
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 7.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 8
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 8.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 9
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 9.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: 10
Producer - Waking up consumer for data consumption.
Producer - Releasing lock on sharedSpace until notified.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: 10.
Consumer - Waking up producer to continue data production.
Consumer - Releasing lock on sharedSpace until notified of new data availability.
Producer - Acquired lock on sharedSpace.
Producer - Produced: END (end of data production token).
Producer - Waking up consumer for data consumption.
Producer - Signing off.
Consumer - Acquired lock on sharedSpace.
Consumer - Consumed: END (end of data production token).
Consumer - Signing off.
Run Code Online (Sandbox Code Playgroud)

  • 以上是正确的吗?(例如,它使用正确的语言工具,正确的方法,它是否包含任何愚蠢的代码,...)

但它"看起来正确"?

即使输出"看起来很好",我也要问正确性,因为你无法想象在我的测试"一次"而不是"另一次"中出现错误的次数(例如当消费者首先开始时,生产者永远不会放弃在制作哨兵等之后).我学会了不要从"成功的运行"中宣称正确性.相反,我对伪并行代码非常怀疑!(根据定义,这个甚至不是平行的!0

扩展答案

一个很好的问题只关注one requested piece of advice上述问题(如上所述),但如果您愿意,请随时提及您对答案中以下其他主题的任何见解:

  • 我在下次尝试编码时如何测试并行代码?

  • 哪些工具可以帮助我进行开发和调试?考虑我使用Eclipse

  • 如果我允许Producer继续生产,那么这种方法是否会改变,每次生产需要一些不同的时间,同时Consumer消耗任何可用的东西?锁定是否必须移动到其他地方?信号需要从这种等待/通知范式改变吗?

  • 这种做事方法是否过时,我宁愿学习别的东西吗?从这个收费站,我不知道"在Java的现实世界中发生了什么"

下一步

  • 我应该从哪里出发?我已经看到了某个地方提到的"期货"的概念,但我可以使用一个有编号的主题列表按顺序进行,教育性地订购,并链接到相关的学习资源

蒂诺中国

Gra*_*ray 6

以上是正确的吗?

我看到的唯一问题是@Tudor和@Bhaskar提到的问题.无论何时在等待条件时测试条件,都必须使用while循环.然而,这更多是关于与多个生产者和消费者的竞争条件.可能会发生虚假的唤醒,但竞争条件更有可能发生.请参阅我的主题页面.

是的,您只有1个生产者和1个消费者,但您可以尝试为多个消费者扩展代码或将代码复制到另一个方案.

我学会了不要从"成功的运行"中宣称正确性.相反,我对伪并行代码非常怀疑!

本能很好.

我在下次尝试编码时如何测试并行代码?

这很难.扩展它是一种方式.添加多个生产者和消费者,看看是否有问题.在具有不同数量/类型的处理器的多个体系结构上运行.您最好的防御将是代码正确性.紧密同步,用好BlockingQueue,ExecutorService等课程,让您的亲密简单/清洁剂.

没有简单的答案.测试多线程代码非常困难.

哪些工具可以帮助我进行开发和调试?

就一般情况而言,我会研究像Emma这样的覆盖工具,这样您就可以确保您的单元测试覆盖了所有代码.

在多线程代码测试方面,了解如何读取kill -QUIT线程转储并查看Jconsole内部运行的线程.像YourKit这样的Java分析器也可能有所帮助.

如果我允许生产者继续生产,那么这种方法会改变吗?每次生产需要一些不同的时间......

我不这么认为.消费者将永远等待制片人.也许我不理解这个问题?

这种做事方法是否过时,我宁愿学习别的东西吗?从这个收费站,我不知道"在Java的现实世界中发生了什么"

接下来要了解这些ExecutorService课程.它们处理大部分new Thread()样式代码 - 特别是当您处理使用线程执行的许多异步任务时.这是一个教程.

我应该从哪里出发?

再次,ExecutorService.我假设你已经阅读了这个开始的文档.正如@Bhaskar所提到的,Java Concurrency in Practice是一本很好的圣经.


以下是有关您的代码的一般注释:

  • SharedSpaceThreaded班似乎是一个人为的方式来做到这一点.如果你正在玩基类之类的话就好了.但总的来说,我从不使用这样的模式.生产者和消费者通常使用BlockingQueue类似的工具,LinkedBlockingQueue在这种情况下,同步和volatile有效负载将由您负责.此外,我倾向于将共享信息注入到对象构造函数中,而不是从基类中获取它.

  • 通常,如果我使用synchronized它是在一个private final字段上.我经常创建一个private final Object lockObject = new Object();for锁,除非我已经使用了一个对象.

  • 小心巨大的synchronized块并将日志消息放在synchronized部分内.日志通常synchronized对文件系统执行IO,这可能非常昂贵.synchronized如果可能的话,你应该有一个小的,非常紧的块.

  • 您定义consumedData循环外部.我会在赋值时定义它,然后使用a break从循环中保释,如果是的话== -1.确保尽可能限制局部变量范围.

  • 您的日志消息将主导您的代码性能.这意味着当您删除它们时,您的代码将以完全不同的方式执行.当您使用它调试问题时,这非常重要.当您迁移到具有不同CPU /核心的不同体系结构时,性能(很可能)也会发生变化.

  • 你可能知道这一点,但是当你打电话sharedSpace.notify();时,这只意味着如果当前正在通知另一个线程sharedSpace.wait();.如果它不是别的,那么它将错过通知.仅供参考.

  • 做一个有点奇怪if (nIterations <= N_ITERATIONS),然后再下3行else.复制notify()将更好地简化分支.

  • int nIterations = 0;然后你有一个while内部的++.这是一个for循环的配方:

    for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
    
    Run Code Online (Sandbox Code Playgroud)

这是一个更紧凑的代码版本.这只是我如何写它的一个例子.同样,除了缺失之外while,您的版本似乎没有任何问题.

public class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;
    public Consumer(BlockingQueue<Integer> queue) {
       this.queue = queue;
    }
    @Override
    public void run() {
       while (true) {
          int consumedData = queue.take();
          if (consumedData ==  Producer.FINAL_VALUE) {
              logger.info("Consumed: END (end of data production token).");
              break;
          }
          logger.info("Consumed: {}.", consumedData);
       }
       logger.info("Signing off.");
    }
}

public class Producer implements Runnable {
    public static final int FINAL_VALUE = -1;
    private final BlockingQueue<Integer> queue;
    public Producer(BlockingQueue<Integer> queue) {
       this.queue = queue;
    }
    @Override
    public void run() {
       for (int nIterations = 0; nIterations <= N_ITERATIONS; nIterations++) {
          logger.info("Produced: {}", nIterations);
          queue.put(nIterations);
       }
       queue.put(FINAL_VALUE);
       logger.info("Produced: END (end of data production token).");
       logger.info("Signing off.");
    }
}

public class ProducerConsumer {
    public static void main(String[] args) {
       // you can add an int argument to the LinkedBlockingQueue constructor
       // to only allow a certain number of items in the queue at one time
       BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
       Thread producer = new Thread(new Producer(queue), "Producer");
       Thread consumer = new Thread(new Consumer(queue), "Consumer");
       // start and join go here
    }
}
Run Code Online (Sandbox Code Playgroud)