Scala理解的未来:顺序与并行

pvl*_*bzn 3 parallel-processing concurrency scala future

在这里,我们有SeqPar一个包含task例程的对象,该例程是一个简单的模拟Future,它打印出一些调试信息并返回Future[Int]类型。

问题是:为什么experiment1允许experiment2总是并行运行而总是并行运行?

object SeqPar {
  def experiment1: Int = {
    val f1 = task(1)
    val f2 = task(2)
    val f3 = task(3)

    val computation = for {
      r1 <- f1
      r2 <- f2
      r3 <- f3
    } yield (r1 + r2 + r3)

    Await.result(computation, Duration.Inf)
  }

  def experiment2: Int = {
    val computation = for {
      r1 <- task(1)
      r2 <- task(2)
      r3 <- task(3)
    } yield (r1 + r2 + r3)

    Await.result(computation, Duration.Inf)
  }

  def task(i: Int): Future[Int] = {
    Future {
      println(s"task=$i thread=${Thread.currentThread().getId} time=${System.currentTimeMillis()}")
      i * i
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

当我运行experiment1它打印出来:

task=3 thread=24 time=1541326607613
task=1 thread=22 time=1541326607613
task=2 thread=21 time=1541326607613
Run Code Online (Sandbox Code Playgroud)

虽然experiment2

task=1 thread=21 time=1541326610653
task=2 thread=20 time=1541326610653
task=3 thread=21 time=1541326610654
Run Code Online (Sandbox Code Playgroud)

观察到差异的原因是什么?我确实知道这种for理解会消沉,f1.flatMap(r1 => f2.flatMap(r2 => f3.map(r3 => r1 + r2 + r3)))但是我仍然遗漏了一个观点,为什么一个人可以并行运行,而另一个人却不能并行运行。

Tox*_*ris 6

这是一个什么样的效果Future(…),并flatMap做到:

  • val future = Future(task) 开始并行运行任务
  • future.flatMap(result => task)安排运行taskfuture完成

请注意,future.flatMap(result => task)future完成之前无法并行开始运行任务,因为要运行task,我们需要result,仅在future完成时可用。

现在,让我们看看您的example1

def experiment1: Int = {
  // construct three independent tasks and start running them
  val f1 = task(1)
  val f2 = task(2)
  val f3 = task(3)

  // construct one complicated task that is ...
  val computation =
    // ... waiting for f1 and then ...
    f1.flatMap(r1 =>
      // ... waiting for f2 and then ...
      f2.flatMap(r2 =>
        // ... waiting for f3 and then ...
        f3.map(r3 =>
          // ... adding some numbers.
          r1 + r2 + r3)))

  // now actually trigger all the waiting
  Await.result(computation, Duration.Inf)
}
Run Code Online (Sandbox Code Playgroud)

因此example1,由于所有三个任务都花费相同的时间并同时开始,因此我们可能只需要在等待时阻塞即可f1。当我们到处等待时f2,其结果应该已经存在。

现在example2有什么不同?

def experiment2: Int = {
  // construct one complicated task that is ...
  val computation =
    // ... starting task1 and then waiting for it and then ...
    task(1).flatMap(r1 =>
      // ... starting task2 and then waiting for it and then ...
      task(2).flatMap(r2 =>
        // ... starting task3 and then waiting for it and then ...
        task(3).map(r3 =>
          // ... adding some numbers.
          r1 + r2 + r3)))

  // now actually trigger all the waiting and the starting of tasks
  Await.result(computation, Duration.Inf)
}
Run Code Online (Sandbox Code Playgroud)

在此示例中,我们甚至没有task(2)在等待task(1)完成之前进行构造,因此任务无法并行运行。

因此,在使用Scala进行编程时Future,您必须通过在喜欢的example1代码和喜欢的代码之间正确选择来控制并发性example2。或者,您可以研究可以对并发提供更明确控制的库。