yur*_*ura 3 parallel-processing scala scala-collections
假设我有一个应该转换为Map的集合,但不是像map方法那样一对一的方式.
var map = collection.mutable.HashMap()
for (p <- dataList.par) {
if(cond1(p)) {
map += (p, true)
} else {
// do nothing
}
}
Run Code Online (Sandbox Code Playgroud)
我想出了几个解决方案,想知道什么是最好的.
map.synchronize { map += (p, true) }
使用actor更新地图.但我不知道如何等待所有演员任务完成
yield Some(p) or None然后跑foreach { case Some(p) => map += (p, true)}.但是,如果第一个迭代器来自并行集合,我不知道如何使它成为顺序.不确定实际上会表现最好,但这应该使条件的评估平行:
import scala.collection._
val map: mutable.Map[Int, Boolean]
= dataList.par.collect{case p if cond1(p) => (p, true)}(breakOut)
Run Code Online (Sandbox Code Playgroud)
(使用可变映射,因为它是您的代码所做的,但这不是必需的).
上下文必须给出预期结果的类型(因此: mutable.Map[Int, Boolean])breakOut才能工作.
编辑: breakOut是scala.collection.breakOut.返回集合(此处collect)的集合操作采用隐式参数bf: CanBuildFrom[SourceCollectionType, ElementType, ResultType].CanBuildFroms安排了库提供的隐式,以便返回最佳的ResultType,最好是最接近源集合类型.传递breakOut代替此隐式参数,以便CanBuildFrom可以选择另一个结果类型.无论源集合类型如何,breakOut选择的是什么CanBuildFrom.但是,有许多可用的含义,没有优先权规则.这就是为什么结果类型必须由上下文给出,以便可以选择其中一个含义.
总而言之,当breakOut代替隐式参数传递时,结果将构建为上下文中预期的类型.
将breakOut在对方的回答中提到解析到一个建设者工厂期望类型的集合map.预期的类型map是mutable.Map[Int, Boolean].
由于构建器工厂由顺序集合提供,因此collect不会并行执行:
scala> val cond1: Int => Boolean = _ % 2 == 0
cond1: Int => Boolean = <function1>
scala> val dataList = 1 to 10
dataList: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val map: mutable.Map[Int,Boolean] = dataList.par.collect{case p if cond1(p) => println(Thread.currentThread); (p, true)}(breakOut)
Thread[Thread-8,5,main]
Thread[Thread-8,5,main]
Thread[Thread-8,5,main]
Thread[Thread-8,5,main]
Thread[Thread-8,5,main]
map: scala.collection.mutable.Map[Int,Boolean] = Map(10 -> true, 8 -> true, 4 -> true, 6 -> true, 2 -> true)
Run Code Online (Sandbox Code Playgroud)
你可以从线程名称中看到 - 线程应该包含一个名字ForkJoin-something.
正确的方法应该是首先使用breakOut期望的类型是并行映射,以便collect并行收益:
scala> val map: parallel.mutable.ParMap[Int,Boolean] = dataList.par.collect{case p if cond1(p) => println(Thread.currentThread);(p, true)}(breakOut)
Thread[Thread-9,5,main]
Thread[Thread-9,5,main]
Thread[Thread-9,5,main]
Thread[Thread-9,5,main]
Thread[Thread-9,5,main]
map: scala.collection.parallel.mutable.ParMap[Int,Boolean] = ParHashMap(10 -> true, 8 -> true, 4 -> true, 6 -> true, 2 -> true)
Run Code Online (Sandbox Code Playgroud)
然后调用seq结果collect,因为seq总是如此O(1).
更新:刚刚检查 - 这似乎与trunk有效,但与2.9.1.final无关.
但是,正如您所看到的,这不起作用,因为它是一个错误,并将在下一版本的Scala中修复.解决方法:
scala> val map: parallel.mutable.ParMap[Int, Boolean] = dataList.par.collect{case p if cond1(p) => println(Thread.currentThread);(p, true)}.map(x => x)(breakOut)
Thread[ForkJoinPool-1-worker-7,5,main]
Thread[ForkJoinPool-1-worker-3,5,main]
Thread[ForkJoinPool-1-worker-0,5,main]
Thread[ForkJoinPool-1-worker-8,5,main]
Thread[ForkJoinPool-1-worker-1,5,main]
map: scala.collection.parallel.mutable.ParMap[Int,Boolean] = ParHashMap(10 -> true, 8 -> true, 4 -> true, 6 -> true, 2 -> true)
scala> val sqmap = map.seq
sqmap: scala.collection.mutable.Map[Int,Boolean] = Map(10 -> true, 8 -> true, 4 -> true, 6 -> true, 2 -> true)
Run Code Online (Sandbox Code Playgroud)
注意到最终map将按顺序完成.
或者,如果只是parallel.ParMap你可以,你可以这样做:
scala> val map: Map[Int, Boolean] = dataList.par.collect{case p if cond1(p) => println(Thread.currentThread);(p, true)}.toMap.seq
Thread[ForkJoinPool-1-worker-2,5,main]
Thread[ForkJoinPool-1-worker-3,5,main]
Thread[ForkJoinPool-1-worker-7,5,main]
Thread[ForkJoinPool-1-worker-1,5,main]
Thread[ForkJoinPool-1-worker-8,5,main]
map: scala.collection.Map[Int,Boolean] = Map(10 -> true, 6 -> true, 2 -> true, 8 -> true, 4 -> true)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1250 次 |
| 最近记录: |