JZe*_*eta 13 scala scala-collections scala-2.10
在2.10之前的Scala中,我可以在defaultForkJoinPool中设置并行性(如此答案scala parallel collections of parallelism).在Scala 2.10中,该API不再存在.有充分证据表明,我们可以通过分配给它的taskSupport属性在单个集合(http://docs.scala-lang.org/overviews/parallel-collections/configuration.html)上设置并行性.
但是,我在我的代码库中使用并行集合,并且不希望在每个集合实例化中添加额外的两行.有没有办法配置全局默认线程池大小,以便someCollection.par.map(f(_))自动使用默认线程数?
And*_*kin 17
我知道问题是一个多月了,但我刚才有同样的问题.谷歌搜索没有帮助,我找不到任何在新API中看起来中等的东西.
按照此处的建议设置-Dscala.concurrent.context.maxThreads = n:在Scala 2.10中设置所有集合的并行度级别?看起来根本没有任何影响,但我不确定我是否正确使用它(我在没有明确安装'scala'的环境中使用'java'运行我的应用程序,这可能是原因).
我不知道为什么scala-people从适当的包对象中删除了这个必要的setter.
但是,通常可以使用反射来解决不完整/奇怪的界面:
def setParallelismGlobally(numThreads: Int): Unit = {
val parPkgObj = scala.collection.parallel.`package`
val defaultTaskSupportField = parPkgObj.getClass.getDeclaredFields.find{
_.getName == "defaultTaskSupport"
}.get
defaultTaskSupportField.setAccessible(true)
defaultTaskSupportField.set(
parPkgObj,
new scala.collection.parallel.ForkJoinTaskSupport(
new scala.concurrent.forkjoin.ForkJoinPool(numThreads)
)
)
}
Run Code Online (Sandbox Code Playgroud)
对于那些不熟悉Scala更加模糊的功能的人,这里有一个简短的解释:
scala.collection.parallel.`package`
Run Code Online (Sandbox Code Playgroud)
使用defaultTaskSupport变量访问包对象(它看起来有点像Java的静态变量,但它实际上是包对象的成员变量).标识符需要反引号,因为package它是保留关键字.然后我们得到我们想要的私有final字段(getField("defaultTaskSupport")由于某种原因不起作用?...),告诉它是可访问的,以便能够修改它,然后替换它的值我们自己的ForkJoinTaskSupport.
我还不了解创建并行集合的确切机制,但Combiner特性的源代码表明defaultTaskSupport的值应以某种方式渗透到并行集合.
请注意,问题在质量上与更古老的问题相同:"我的代码库中包含Math.random(),如何将种子设置为固定数字以进行调试?" (参见例如:在Math.random上设置种子()).在这两种情况下,我们都有一些全局"静态"变量,我们在百万个不同的地方隐式使用它们,我们想要改变它,但是这个变量没有设置器=>我们使用反射.
丑陋如地狱,但似乎工作得很好.如果需要限制线程总数,请不要忘记垃圾收集器在单独的线程上运行.
| 归档时间: |
|
| 查看次数: |
2150 次 |
| 最近记录: |