scala中的并发map/foreach

Dav*_*haw 16 concurrency functional-programming scala

我有一个迭代vals: Iterable[T]和一个长期运行的功能,没有任何相关的副作用:f: (T => Unit).现在这应用于vals显而易见的方式:

vals.foreach(f)
Run Code Online (Sandbox Code Playgroud)

我希望f同时完成调用(在合理的限制内).Scala基础库中某处有明显的功能吗?就像是:

Concurrent.foreach(8 /* Number of threads. */)(vals, f)
Run Code Online (Sandbox Code Playgroud)

虽然f运行时间相当长,但它足够短,我不希望为每个调用调用一个线程的开销,所以我正在寻找基于线程池的东西.

Kei*_*ven 17

2009年的许多答案仍然使用旧的scala.actors.Futures._,它们不再是新的Scala.虽然Akka是首选方式,但更易读的方法是使用并行(.par)集合:

vals.foreach { v => f(v) }

vals.par.foreach { v => f(v) }

或者,使用parMap可能看起来更简洁,但需要记住要记住导入通常的Scalaz*.像往常一样,Scala中有不止一种方法可以做同样的事情!


Apo*_*isp 13

ScalazparMap.您可以按如下方式使用它:

import scalaz.Scalaz._
import scalaz.concurrent.Strategy.Naive
Run Code Online (Sandbox Code Playgroud)

这将为每个函子(包括Iterable)提供一个parMap方法,所以你可以这样做:

vals.parMap(f)
Run Code Online (Sandbox Code Playgroud)

您还可以得到parFlatMap,parZipWith等等.


Dan*_*wak 10

我喜欢这个Futures答案.但是,虽然它将同时执行,但它也将异步返回,这可能不是你想要的.正确的方法如下:

import scala.actors.Futures._

vals map { x => future { f(x) } } foreach { _() }
Run Code Online (Sandbox Code Playgroud)

  • 要小心`vals`是一个严格的集合 - 如果它是懒惰的(并且在Scala 2.7中它包括`Range`类),在foreach`需要每一个之前不会创建未来,并且不会发生任何事情在平行下. (8认同)