并行迭代文件中的行(Scala)?

sch*_*mmd 35 parallel-processing concurrency scala

我知道Scala中的并行集合.它们很方便!但是,我想迭代一个文件的行,这个文件对于并行的内存来说太大了.例如,我可以创建线程并在扫描仪上设置锁定,但如果我可以运行以下代码,那将会很棒:

Source.fromFile(path).getLines.par foreach { line =>
Run Code Online (Sandbox Code Playgroud)

然而不幸的是

error: value par is not a member of Iterator[String]
Run Code Online (Sandbox Code Playgroud)

在这里完成一些并行性的最简单方法是什么?现在,我将阅读一些行并且并行处理它们.

Jos*_*man 32

您可以使用分组轻松地将迭代器切片为可以加载到内存中的块,然后并行处理.

val chunkSize = 128 * 1024
val iterator = Source.fromFile(path).getLines.grouped(chunkSize)
iterator.foreach { lines => 
    lines.par.foreach { line => process(line) }
}
Run Code Online (Sandbox Code Playgroud)

在我看来,这样的事情是最简单的方法.

  • 它不应该因为迭代器上的操作通常尽可能地懒惰.例如,map将从原始迭代器生成一个新的迭代器,而不会遍历底层集合.为了验证,我实际上查看了最新的scala源代码.对迭代器进行分组会生成一个GroupedIterator类,该类对流进行一些内部缓冲以将其分组为块.它不会强制遍历整个迭代器. (3认同)

Dan*_*mon 10

我会把它作为一个单独的答案,因为它与我的上一个根本不同(它实际上有效)

这是使用actor的解决方案的大纲,这基本上是Kim Stebel的评论所描述的.有两个actor类,一个FileReader actor,可以根据需要从文件中读取各行,以及几个Worker actor.工作人员都向读取器发送行请求,并在从文件中读取行并行处理行.

我在这里使用Akka演员,但使用其他实现基本上是相同的想法.

case object LineRequest
case object BeginProcessing

class FileReader extends Actor {

  //reads a single line from the file or returns None if EOF
  def getLine:Option[String] = ...

  def receive = {
    case LineRequest => self.sender.foreach{_ ! getLine} //sender is an Option[ActorRef]
  }
}

class Worker(reader: ActorRef) extends Actor {

  def process(line:String) ...

  def receive = {
    case BeginProcessing => reader ! LineRequest
    case Some(line) => {
      process(line)
      reader ! LineRequest
    }
    case None => self.stop
  }
}

val reader = actorOf[FileReader].start    
val workers = Vector.fill(4)(actorOf(new Worker(reader)).start)
workers.foreach{_ ! BeginProcessing}
//wait for the workers to stop...
Run Code Online (Sandbox Code Playgroud)

这样,一次不超过4个(或者你有多少工人)未处理的行在内存中.