嵌套 Future.sequence 按顺序执行包含的 Futures

nir*_*fri 6 concurrency scala concurrent.futures

我有一个 future( doFour) 被执行并将结果传递给一个平面图。在平面图中,我再执行两个未来(doOnedoTwo)函数,期望它们并行运行,但我看到它们按顺序运行(2.13)。斯卡斯蒂

为什么并行doOnedoTwo不是并行执行?

我怎样才能让它们并行运行?

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

object Test {
  def doOne(): Future[Unit] = Future {
    println("startFirst");      Thread.sleep(3000);     println("stopFirst")
  }

  def doTwo(): Future[Unit] = Future {
    println("startSecond");      Thread.sleep(1000);      println("stopSecond")
  }


  def doFour(): Future[Unit] = Future {
    println("do 4");     Thread.sleep(1000);     println("done 4")

  }


  def main(args: Array[String]) {


    val resOut = doFour().flatMap { a =>

      val futureOperations = Seq(doOne(), doTwo())

      val res = Future.sequence(futureOperations)
      res
    }

    val stream = Await.result(resOut, Duration.Inf)
  }
}
Run Code Online (Sandbox Code Playgroud)

Tim*_*Tim 7

A 一Future被创建就可以执行。所以这一行创建了两个Futures可能被执行的:

val futureOperations = Seq(doOne(), doTwo())
Run Code Online (Sandbox Code Playgroud)

调用Future.sequence将创建一个新的Future等待每个期货依次完成,但它们都已经可以在代码中的这一点执行。

val res = Future.sequence(futureOperations)
Run Code Online (Sandbox Code Playgroud)

如果您希望Futures按顺序启动,则需要使用map/flatMap

val res = doOne().map( _ => doTwo())
Run Code Online (Sandbox Code Playgroud)

在完成doTwo之前不会调用此代码doOne(如果doOne失败则根本不会调用)

这在您的示例中似乎没有发生的原因是您正在调用阻塞操作,Future该操作阻塞了一个线程,否则该线程将用于执行其他Futures。所以虽然有两个Futures 可供执行,但实际上一次只有一个被执行。

如果您将代码标记为blocking正常工作:

import scala.concurrent.blocking

def doOne(): Future[Unit] = Future {
  blocking{println("startFirst");      Thread.sleep(3000);     println("stop First")}
}

def doTwo(): Future[Unit] = Future {
  blocking{println("startSecond");      Thread.sleep(1000);      println("stop Second")}
}
Run Code Online (Sandbox Code Playgroud)

有关为什么不同版本的默认行为不同的详细信息,以及为什么您永远不应该对独立Futures的相对执行顺序进行假设,请参阅评论部分。

  • 有一个关于默认“BatchingExecutor”更改的代码注释(不会进入 API);它解释了阻塞行为以及奇怪的顺序:https://github.com/scala/scala/blob/v2.13.2/src/library/scala/concurrent/BatchingExecutor.scala#L50 PR 讨论 https:// github.com/scala/scala/pull/7470 (3认同)
  • @Tim“Future 一旦创建就开始执行”——这实际上是不正确的。我认为你的意思是,“Future 一旦被创建就可以执行。” 这很重要,因为这种差异正是问题所在!`doTwo` 适合立即运行,但(在 2.13 上)直到一段时间后才运行。 (2认同)