如果我有这个:
val a = Array(...)
Run Code Online (Sandbox Code Playgroud)
我写
a.par.map(e => someFunc(e))
Run Code Online (Sandbox Code Playgroud)
生成的集合是否与非并行集合的顺序相同?
在集合映射中查找结果的习惯用法如下:
list.view.map(f).find(p)
Run Code Online (Sandbox Code Playgroud)
其中list是一个List[A],f是一个A => B,并且p是一个B => Boolean.
是否可以使用view并行集合?我问,因为我得到一些非常奇怪的结果:
Welcome to Scala version 2.9.1.final (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0).
Type in expressions to have them evaluated.
Type :help for more information.
scala> val f : Int => Int = i => {println(i); i + 10}
f: Int => Int = <function1>
scala> val list = (1 to 10).toList
list: List[Int] = List(1, 2, 3, 4, …Run Code Online (Sandbox Code Playgroud) 我一直在寻找新的Scala 2.9并行系列,我希望放弃一大堆类似东西的狡猾的业余版本.特别是,我想用自己的东西替换默认实现基础的fork join pool(例如,通过actor分配网络任务评估的东西).我的理解是,这只是应用Scala的"可堆叠修改"范式的问题,但是集合库非常令人生畏,我不确定哪些位需要修改!
一些具体问题:
ForkJoinTasks?FutureThreadPoolTasks.我如何构建一个使用这个特性的集合而不是ForkJoinTasks?AdaptiveWorkStealingTasks并以某种方式实例化使用这个新特征的集合实例?(作为参考,上面提到的所有特征都在Tasks.scala中定义.)
特别是代码示例非常受欢迎!
Scala是否提供了一种执行并行映射操作的方法作为标准语言的一部分?
例如,给定:
scala> val a = List((1,2), (3,4), (3,6))
a: List[(Int, Int)] = List((1,2), (3,4), (3,6))
Run Code Online (Sandbox Code Playgroud)
我可以:
scala> a.map(tup => tup._1 + tup._2)
res0: List[Int] = List(3, 7, 9)
Run Code Online (Sandbox Code Playgroud)
但是,据我所知,这将按顺序将提供的函数映射到列表对象上.是否有内置的方法将函数应用于单独的线程(或等效的)中的每个元素,然后将结果收集到结果列表中?
我使用Parallel.Foreach来填充外部ConcurrentBag.我也尝试使用一个常见的List,一切正常.
我很幸运或者我错过了ConcurrentBag的特殊范围?
假设我有一个有点大(几百万个项目,或者说)字符串列表.运行这样的东西是个好主意:
val updatedList = myList.par.map(someAction).toList
Run Code Online (Sandbox Code Playgroud)
或者在运行之前将列表分组是一个更好的主意...par.map(,如下所示:
val numberOfCores = Runtime.getRuntime.availableProcessors
val updatedList =
myList.grouped(numberOfCores).toList.par.map(_.map(someAction)).toList.flatten
Run Code Online (Sandbox Code Playgroud)
更新:由于someAction是相当昂贵的(比较grouped,toList等等)
像这样,Scala 2.12中的并行集合可以直接导入
import scala.collection.parallel.immutable.ParVector
val pv = new ParVector[Int]
Run Code Online (Sandbox Code Playgroud)
但是为什么在Scala 2.13软件包中scala.collection.parallel似乎丢失了?
给定了一个非常大的collection.parallel.mutable.ParHashMap(或任何其他并行集合)实例,如果找到一个给定的,比如50个匹配的数量,那么如何中止过滤并行扫描?
尝试在线程安全的"外部"数据结构中累积中间匹配或保持外部AtomicInteger的结果计数在4个内核上比使用常规collection.mutable.HashMap慢2到3倍,并将单个内核挂在100 %.
我知道Par*集合中的查找或存在确实在"内部"中止.有没有办法概括这个以找到多个结果?
下面是这似乎仍然是2至3倍慢上〜79,000项的ParHashMap的代码,也有馅的问题更比的maxResults(成果转化的结果CHM这可能是由于线程后被抢占incrementAndGet但在此之前突破这允许其他线程添加更多元素).更新:似乎减速是由于工作者线程在counter.incrementAndGet()上竞争,这当然违背了整个并行扫描的目的:-(
def find(filter: Node => Boolean, maxResults: Int): Iterable[Node] =
{
val counter = new AtomicInteger(0)
val results = new ConcurrentHashMap[Key, Node](maxResults)
import util.control.Breaks._
breakable
{
for ((key, node) <- parHashMap if filter(node))
{
results.put(key, node)
val total = counter.incrementAndGet()
if (total > maxResults) break
}
}
results.values.toArray(new Array[Node](results.size))
}
Run Code Online (Sandbox Code Playgroud) 我采取List[Int]和要搜索的值x,其中x * 10 > 500并行.因此,如果列表包含任何值51或更大,exists则应返回true.
def f(x: Int) = {
println("calculating for " + x)
Thread.sleep(100 - x)
println("finished " + x)
x * 10
}
val res = List.range(1, 100).par.exists(f(_) > 500)
Run Code Online (Sandbox Code Playgroud)
结果如下:
calculating for 1
calculating for 25
calculating for 50
calculating for 75
calculating for 13
finished 75 // <-- first valid result found: 75 * 10 > 500
finished 50
calculating for 51 // but it …Run Code Online (Sandbox Code Playgroud) 根据关于并行收集和在互联网上搜索的论文,并行集合应该与视图一起工作,但我不清楚它们之间的区别
coll.par.view.someChainedIterations
Run Code Online (Sandbox Code Playgroud)
和
coll.view.par.someChainedIterations
Run Code Online (Sandbox Code Playgroud)
似乎coll.view.par放弃了集合的视图:
scala> val coll = 1 to 3
coll: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3)
scala> coll.view.par
res2: scala.collection.parallel.ParSeq[Int] = ParArray(1, 2, 3)
scala> coll.par.view
res3: java.lang.Object with scala.collection.parallel.ParSeqView[Int,scala.collection.parallel.immutable.ParSeq[Int],scala.collection.immutable.Seq[Int]] = $anon$1(1, 2, 3)
Run Code Online (Sandbox Code Playgroud)
但我不知道为什么.它是一个功能还是一个bug?
parallel-processing scala scala-collections parallel-collections