如何设置并行集合的线程号?

use*_*837 7 parallel-processing scala

我可以像这样并行运行scala的foreach:

val N = 100
(0 until N).par.foreach(i => {
   // do something
})
Run Code Online (Sandbox Code Playgroud)

但是如何设置线程号?我想要这样的东西:

val N = 100
val NThreads = 5
(0 until N).par.foreach(NThreads, i => {
   // do something
})
Run Code Online (Sandbox Code Playgroud)

cur*_*ous 15

每个并行集合都保留一个tasksupport对象,该对象保持对线程池实现的引用.

因此,您可以tasksupport根据需要通过将对象的引用更改为新的线程池来设置通过该对象的并行度级别.例如:

def f(numOfThread: Int, n: Int) = {
 import scala.collection.parallel._
 val coll = (0 to n).par
 coll.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(numOfThreads))
  coll.foreach(i => {
   // do something
  })
}

f(2, 100)
Run Code Online (Sandbox Code Playgroud)

有关配置并行集合的更多信息,请参阅http://docs.scala-lang.org/overviews/parallel-collections/configuration.html


Avs*_*riy 8

官方Scala文档提供了一种更改并行集合的任务支持的方法,如下所示:

import scala.collection.parallel._
val pc = mutable.ParArray(1, 2, 3)
pc.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
Run Code Online (Sandbox Code Playgroud)

还提到了这一点

默认情况下,执行上下文任务支持设置为每个并行集合,因此并行集合将重用与未来API相同的fork-join池.

这意味着您应该创建单个池并重用它.这种方法导致资源泄漏:

def calculate(collection: Seq[Int]): Seq[Int] = {
  val parallel = collection.par
  parallel.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(5))
  parallel.map(_ * 2).seq
} 
Run Code Online (Sandbox Code Playgroud)

正确的方法是重用现有池:

val taskSupport = new ForkJoinTaskSupport(new ForkJoinPool(5))

def calculate(collection: Seq[Int]): Seq[Int] = {
  val parallel = collection.par
  parallel.tasksupport = taskSupport
  parallel.map(_ * 2).seq
}
Run Code Online (Sandbox Code Playgroud)