Eri*_*lun 14 parallel-processing reduce scala future fold
考虑以下:
import scala.concurrent._
import scala.concurrent.duration.Duration.Inf
import scala.concurrent.ExecutionContext.Implicits.global
def slowInt(i: Int) = { Thread.sleep(200); i }
def slowAdd(x: Int, y: Int) = { Thread.sleep(100); x + y }
def futures = (1 to 20).map(i => future(slowInt(i)))
def timeFuture(fn: => Future[_]) = {
val t0 = System.currentTimeMillis
Await.result(fn, Inf)
println((System.currentTimeMillis - t0) / 1000.0 + "s")
}
Run Code Online (Sandbox Code Playgroud)
以下两个print~2.5s:
// Use Future.reduce directly (Future.traverse is no different)
timeFuture { Future.reduce(futures)(slowAdd) }
// First wait for all results to come in, convert to Future[List], and then map the List[Int]
timeFuture { Future.sequence(futures).map(_.reduce(slowAdd)) }
Run Code Online (Sandbox Code Playgroud)
据我所知,其原因是它Future.reduce/traverse
是通用的,因此使用关联运算符不会运行得更快,但是,有一种简单的方法来定义一个计算,其中折叠/减少将在至少2时开始值是可用的(或者在1的情况下fold
),因此当列表中的某些项目仍在生成时,已经生成的值已经在计算中了?
Scalaz有一个 future 的实现,其中包括一个chooseAny
组合器,它接受 future 的集合并返回第一个完整元素和其余 future 的元组的 future:
def chooseAny[A](h: Future[A], t: Seq[Future[A]]): Future[(A, Seq[Future[A]])]
Run Code Online (Sandbox Code Playgroud)
Twitter 的 futures 实现称之为select
. 标准库不包含它(但请参阅上面 Som Snytt 指出的Viktor Klang 的实现)。我将在这里使用 Scalaz 的版本,但翻译应该很简单。
让操作按您希望的方式运行的一种方法是将两个已完成的项目从列表中拉出,将它们的总和的未来推回列表中,然后递归(请参阅此要点以获取完整的工作示例):
def collapse[A](fs: Seq[Future[A]])(implicit M: Monoid[A]): Future[A] =
Nondeterminism[Future].chooseAny(fs).fold(Future.now(M.zero))(
_.flatMap {
case (hv, tf) =>
Nondeterminism[Future].chooseAny(tf).fold(Future.now(hv))(
_.flatMap {
case (hv2, tf2) => collapse(Future(hv |+| hv2) +: tf2)
}
)
}
)
Run Code Online (Sandbox Code Playgroud)
在你的情况下,你会这样称呼:
timeFuture(
collapse(futures)(
Monoid.instance[Int]((a, b) => slowAdd(a, b), 0)
)
)
Run Code Online (Sandbox Code Playgroud)
在我的双核笔记本电脑上,它的运行时间只需 1.6 秒多一点,因此它可以按预期工作(即使所花费的时间有所slowInt
不同,也将继续执行您想要的操作)。