标签: parallel-collections

Scala的并行系列会保证订购吗?

如果我有这个:

val a = Array(...)
Run Code Online (Sandbox Code Playgroud)

我写

a.par.map(e => someFunc(e))
Run Code Online (Sandbox Code Playgroud)

生成的集合是否与非并行集合的顺序相同?

parallel-processing scala parallel-collections

21
推荐指数
2
解决办法
1571
查看次数

可以将视图与并行集合一起使用吗?

在集合映射中查找结果的习惯用法如下:

list.view.map(f).find(p)
Run Code Online (Sandbox Code Playgroud)

其中list是一个List[A],f是一个A => B,并且p是一个B => Boolean.

是否可以使用view并行集合?我问,因为我得到一些非常奇怪的结果:

Welcome to Scala version 2.9.1.final (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0).
Type in expressions to have them evaluated.
Type :help for more information.

scala> val f : Int => Int = i => {println(i); i + 10}
f: Int => Int = <function1>

scala> val list = (1 to 10).toList
list: List[Int] = List(1, 2, 3, 4, …
Run Code Online (Sandbox Code Playgroud)

scala view parallel-collections

20
推荐指数
1
解决办法
551
查看次数

如何为Scala 2.9并行集合替换fork连接池?

我一直在寻找新的Scala 2.9并行系列,我希望放弃一大堆类似东西的狡猾的业余版本.特别是,我想用自己的东西替换默认实现基础的fork join pool(例如,通过actor分配网络任务评估的东西).我的理解是,这只是应用Scala的"可堆叠修改"范式的问题,但是集合库非常令人生畏,我不确定哪些位需要修改!

一些具体问题:

  1. 标准并行实现通过代码中的fork连接池进行交互是否正确ForkJoinTasks
  2. 我看到有另一种特质,FutureThreadPoolTasks.我如何构建一个使用这个特性的集合而不是ForkJoinTasks
  3. 我是否可以编写另一个替代方法(也许是一个混合的相应样板类,AdaptiveWorkStealingTasks并以某种方式实例化使用这个新特征的集合实例?

(作为参考,上面提到的所有特征都在Tasks.scala中定义.)

特别是代码示例非常受欢迎!

parallel-processing scala scala-2.9 parallel-collections

19
推荐指数
2
解决办法
1637
查看次数

并行映射操作?

Scala是否提供了一种执行并行映射操作的方法作为标准语言的一部分?

例如,给定:

scala> val a = List((1,2), (3,4), (3,6))
a: List[(Int, Int)] = List((1,2), (3,4), (3,6))
Run Code Online (Sandbox Code Playgroud)

我可以:

scala> a.map(tup => tup._1 + tup._2)
res0: List[Int] = List(3, 7, 9)
Run Code Online (Sandbox Code Playgroud)

但是,据我所知,这将按顺序将提供的函数映射到列表对象上.是否有内置的方法将函数应用于单独的线程(或等效的)中的每个元素,然后将结果收集到结果列表中?

concurrency scala parallel-collections

18
推荐指数
2
解决办法
1万
查看次数

ConcurrentBag比List好吗?

我使用Parallel.Foreach来填充外部ConcurrentBag.我也尝试使用一个常见的List,一切正常.

我很幸运或者我错过了ConcurrentBag的特殊范围?

.net parallel-processing parallel-collections

12
推荐指数
2
解决办法
6960
查看次数

运行`... par.map(直接在大型列表上?)是一个好主意吗?

假设我有一个有点大(几百万个项目,或者说)字符串列表.运行这样的东西是个好主意:

val updatedList = myList.par.map(someAction).toList
Run Code Online (Sandbox Code Playgroud)

或者在运行之前将列表分组是一个更好的主意...par.map(,如下所示:

val numberOfCores = Runtime.getRuntime.availableProcessors
val updatedList = 
  myList.grouped(numberOfCores).toList.par.map(_.map(someAction)).toList.flatten
Run Code Online (Sandbox Code Playgroud)

更新:由于someAction是相当昂贵的(比较grouped,toList等等)

scala parallel-collections

12
推荐指数
2
解决办法
1516
查看次数

在Scala 2.13中缺少导入scala.collection.parallel

像这样,Scala 2.12中的并行集合可以直接导入

import scala.collection.parallel.immutable.ParVector
val pv = new ParVector[Int]
Run Code Online (Sandbox Code Playgroud)

但是为什么在Scala 2.13软件包中scala.collection.parallel似乎丢失了?

scala parallel-collections scala-2.13

12
推荐指数
1
解决办法
403
查看次数

在找到所需数量的结果时,在早期中止时过滤Scala的并行集合

给定了一个非常大的collection.parallel.mutable.ParHashMap(或任何其他并行集合)实例,如果找到一个给定的,比如50个匹配的数量,那么如何中止过滤并行扫描?

尝试在线程安全的"外部"数据结构中累积中间匹配或保持外部AtomicInteger的结果计数在4个内核上比使用常规collection.mutable.HashMap慢2到3倍,并将单个内核挂在100 %.

我知道Par*集合中的查找存在确实在"内部"中止.有没有办法概括这个以找到多个结果?

下面是这似乎仍然是2至3倍慢上〜79,000项的ParHashMap的代码,也有馅的问题的maxResults(成果转化的结果CHM这可能是由于线程后被抢占incrementAndGet但在此之前突破这允许其他线程添加更多元素).更新:似乎减速是由于工作者线程在counter.incrementAndGet()上竞争,这当然违背了整个并行扫描的目的:-(

def find(filter: Node => Boolean, maxResults: Int): Iterable[Node] =
{
  val counter = new AtomicInteger(0)
  val results = new ConcurrentHashMap[Key,  Node](maxResults)

  import util.control.Breaks._

  breakable
  {
    for ((key, node) <- parHashMap if filter(node))
    {
      results.put(key, node)
      val total = counter.incrementAndGet()
      if (total > maxResults) break
    }
  }

  results.values.toArray(new Array[Node](results.size))
}
Run Code Online (Sandbox Code Playgroud)

parallel-processing scala parallel-collections

10
推荐指数
1
解决办法
527
查看次数

了解并行存在并找到

我采取List[Int]和要搜索的值x,其中x * 10 > 500并行.因此,如果列表包含任何值51或更大,exists则应返回true.

def f(x: Int) = {
  println("calculating for " + x)
  Thread.sleep(100 - x)
  println("finished " + x)
  x * 10
}

val res = List.range(1, 100).par.exists(f(_) > 500)
Run Code Online (Sandbox Code Playgroud)

结果如下:

calculating for 1
calculating for 25
calculating for 50
calculating for 75
calculating for 13
finished 75          // <-- first valid result found: 75 * 10 > 500
finished 50
calculating for 51   // but it …
Run Code Online (Sandbox Code Playgroud)

parallel-processing scala parallel-collections

9
推荐指数
1
解决办法
556
查看次数

使用X.par.view和X.view.par构建Scala并行视图?

根据关于并行收集和在互联网上搜索的论文,并行集合应该与视图一起工作,但我不清楚它们之间的区别

coll.par.view.someChainedIterations
Run Code Online (Sandbox Code Playgroud)

coll.view.par.someChainedIterations
Run Code Online (Sandbox Code Playgroud)

似乎coll.view.par放弃了集合的视图:

scala> val coll = 1 to 3
coll: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3)

scala> coll.view.par
res2: scala.collection.parallel.ParSeq[Int] = ParArray(1, 2, 3)

scala> coll.par.view
res3: java.lang.Object with scala.collection.parallel.ParSeqView[Int,scala.collection.parallel.immutable.ParSeq[Int],scala.collection.immutable.Seq[Int]] = $anon$1(1, 2, 3)
Run Code Online (Sandbox Code Playgroud)

但我不知道为什么.它是一个功能还是一个bug?

parallel-processing scala scala-collections parallel-collections

8
推荐指数
1
解决办法
355
查看次数