与scala并行创建地图的最佳方法是什么?

yur*_*ura 3 parallel-processing scala scala-collections

假设我有一个应该转换为Map的集合,但不是像map方法那样一对一的方式.

var map = collection.mutable.HashMap()
for (p <- dataList.par) {
  if(cond1(p)) {
    map += (p, true)
  } else {
    // do nothing
  }
}
Run Code Online (Sandbox Code Playgroud)

我想出了几个解决方案,想知道什么是最好的.

  1. map.synchronize { map += (p, true) }

  2. 使用actor更新地图.但我不知道如何等待所有演员任务完成

  3. yield Some(p) or None然后跑foreach { case Some(p) => map += (p, true)}.但是,如果第一个迭代器来自并行集合,我不知道如何使它成为顺序.

Did*_*ont 5

不确定实际上会表现最好,但这应该使条件的评估平行:

import scala.collection._
val map: mutable.Map[Int, Boolean] 
  = dataList.par.collect{case p if cond1(p) => (p, true)}(breakOut)
Run Code Online (Sandbox Code Playgroud)

(使用可变映射,因为它是您的代码所做的,但这不是必需的).

上下文必须给出预期结果的类型(因此: mutable.Map[Int, Boolean])breakOut才能工作.

编辑: breakOutscala.collection.breakOut.返回集合(此处collect)的集合操作采用隐式参数bf: CanBuildFrom[SourceCollectionType, ElementType, ResultType].CanBuildFroms安排了库提供的隐式,以便返回最佳的ResultType,最好是最接近源集合类型.传递breakOut代替此隐式参数,以便CanBuildFrom可以选择另一个结果类型.无论源集合类型如何,breakOut选择的是什么CanBuildFrom.但是,有许多可用的含义,没有优先权规则.这就是为什么结果类型必须由上下文给出,以便可以选择其中一个含义.

总而言之,当breakOut代替隐式参数传递时,结果将构建为上下文中预期的类型.


axe*_*l22 5

breakOut在对方的回答中提到解析到一个建设者工厂期望类型的集合map.预期的类型mapmutable.Map[Int, Boolean].

由于构建器工厂由顺序集合提供,因此collect不会并行执行:

scala> val cond1: Int => Boolean = _ % 2 == 0
cond1: Int => Boolean = <function1>

scala> val dataList = 1 to 10
dataList: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val map: mutable.Map[Int,Boolean] = dataList.par.collect{case p if cond1(p) => println(Thread.currentThread); (p, true)}(breakOut)
Thread[Thread-8,5,main]
Thread[Thread-8,5,main]
Thread[Thread-8,5,main]
Thread[Thread-8,5,main]
Thread[Thread-8,5,main]
map: scala.collection.mutable.Map[Int,Boolean] = Map(10 -> true, 8 -> true, 4 -> true, 6 -> true, 2 -> true)
Run Code Online (Sandbox Code Playgroud)

你可以从线程名称中看到 - 线程应该包含一个名字ForkJoin-something.

正确的方式

正确的方法应该是首先使用breakOut期望的类型是并行映射,以便collect并行收益:

scala> val map: parallel.mutable.ParMap[Int,Boolean] = dataList.par.collect{case p if cond1(p) => println(Thread.currentThread);(p, true)}(breakOut)
Thread[Thread-9,5,main]
Thread[Thread-9,5,main]
Thread[Thread-9,5,main]
Thread[Thread-9,5,main]
Thread[Thread-9,5,main]
map: scala.collection.parallel.mutable.ParMap[Int,Boolean] = ParHashMap(10 -> true, 8 -> true, 4 -> true, 6 -> true, 2 -> true)
Run Code Online (Sandbox Code Playgroud)

然后调用seq结果collect,因为seq总是如此O(1).

更新:刚刚检查 - 这似乎与trunk有效,但与2.9.1.final无关.

补丁

但是,正如您所看到的,这不起作用,因为它是一个错误,并将在下一版本的Scala中修复.解决方法:

scala> val map: parallel.mutable.ParMap[Int, Boolean] = dataList.par.collect{case p if cond1(p) => println(Thread.currentThread);(p, true)}.map(x => x)(breakOut)
Thread[ForkJoinPool-1-worker-7,5,main]
Thread[ForkJoinPool-1-worker-3,5,main]
Thread[ForkJoinPool-1-worker-0,5,main]
Thread[ForkJoinPool-1-worker-8,5,main]
Thread[ForkJoinPool-1-worker-1,5,main]
map: scala.collection.parallel.mutable.ParMap[Int,Boolean] = ParHashMap(10 -> true, 8 -> true, 4 -> true, 6 -> true, 2 -> true)

scala> val sqmap = map.seq
sqmap: scala.collection.mutable.Map[Int,Boolean] = Map(10 -> true, 8 -> true, 4 -> true, 6 -> true, 2 -> true)
Run Code Online (Sandbox Code Playgroud)

注意到最终map将按顺序完成.

或者,如果只是parallel.ParMap你可以,你可以这样做:

scala> val map: Map[Int, Boolean] = dataList.par.collect{case p if cond1(p) => println(Thread.currentThread);(p, true)}.toMap.seq
Thread[ForkJoinPool-1-worker-2,5,main]
Thread[ForkJoinPool-1-worker-3,5,main]
Thread[ForkJoinPool-1-worker-7,5,main]
Thread[ForkJoinPool-1-worker-1,5,main]
Thread[ForkJoinPool-1-worker-8,5,main]
map: scala.collection.Map[Int,Boolean] = Map(10 -> true, 6 -> true, 2 -> true, 8 -> true, 4 -> true)
Run Code Online (Sandbox Code Playgroud)