并行Scala流的内存消耗

Ral*_*lph 6 parallel-processing memory-management scala

我编写了一个Scala(2.9.1-1)应用程序,需要处理来自数据库查询的数百万行.我正在使用前面一个问题的答案中显示的技术转换ResultSet为a :Stream

class Record(...)

val resultSet = statement.executeQuery(...)

new Iterator[Record] {
  def hasNext = resultSet.next()
  def next = new Record(resultSet.getString(1), resultSet.getInt(2), ...)
}.toStream.foreach { record => ... }
Run Code Online (Sandbox Code Playgroud)

这非常有效.

由于foreach闭包的主体非常占用CPU,并且作为函数式编程的实用性的证明,如果我在.par之前添加一个foreach,则闭包并行运行而不需要其他工作,除了确保闭合的主体是线程安全的(它是以函数样式编写的,除了打印到线程安全日志之外没有可变数据).

但是,我担心内存消耗.是.par导致整个结果集在RAM中加载,或做并联运行负荷只有尽可能多的行,因为它有活动线程?我已经将4G分配给了JVM(64位-Xmx4g)但是将来我会在更多的行上运行它,并担心我最终会得到一个内存不足.

是否有更好的模式以功能方式进行这种并行处理?我一直在向同事们展示这个应用程序,作为函数式编程和多核机器价值的一个例子.

Nic*_*las 4

如果您查看 的scaladocStream,您会注意到 的定义类parParallelizable特征...并且,如果您查看此特征的源代码,您会注意到它从原始集合中获取每个元素并将它们放入到组合器中,因此,您将把每一行加载到ParSeq

  def par: ParRepr = {
    val cb = parCombiner
    for (x <- seq) cb += x
    cb.result
  }

  /** The default `par` implementation uses the combiner provided by this method
   *  to create a new parallel collection.
   *
   *  @return  a combiner for the parallel collection of type `ParRepr`
   */
  protected[this] def parCombiner: Combiner[A, ParRepr]
Run Code Online (Sandbox Code Playgroud)

一个可能的解决方案是显式并行计算,例如借助参与者。例如,您可以从 akka 文档中查看此示例,这可能对您的上下文有所帮助。