sch*_*mmd 35 parallel-processing concurrency scala
我知道Scala中的并行集合.它们很方便!但是,我想迭代一个文件的行,这个文件对于并行的内存来说太大了.例如,我可以创建线程并在扫描仪上设置锁定,但如果我可以运行以下代码,那将会很棒:
Source.fromFile(path).getLines.par foreach { line =>
Run Code Online (Sandbox Code Playgroud)
然而不幸的是
error: value par is not a member of Iterator[String]
Run Code Online (Sandbox Code Playgroud)
在这里完成一些并行性的最简单方法是什么?现在,我将阅读一些行并且并行处理它们.
Jos*_*man 32
您可以使用分组轻松地将迭代器切片为可以加载到内存中的块,然后并行处理.
val chunkSize = 128 * 1024
val iterator = Source.fromFile(path).getLines.grouped(chunkSize)
iterator.foreach { lines =>
lines.par.foreach { line => process(line) }
}
Run Code Online (Sandbox Code Playgroud)
在我看来,这样的事情是最简单的方法.
Dan*_*mon 10
我会把它作为一个单独的答案,因为它与我的上一个根本不同(它实际上有效)
这是使用actor的解决方案的大纲,这基本上是Kim Stebel的评论所描述的.有两个actor类,一个FileReader actor,可以根据需要从文件中读取各行,以及几个Worker actor.工作人员都向读取器发送行请求,并在从文件中读取行并行处理行.
我在这里使用Akka演员,但使用其他实现基本上是相同的想法.
case object LineRequest
case object BeginProcessing
class FileReader extends Actor {
//reads a single line from the file or returns None if EOF
def getLine:Option[String] = ...
def receive = {
case LineRequest => self.sender.foreach{_ ! getLine} //sender is an Option[ActorRef]
}
}
class Worker(reader: ActorRef) extends Actor {
def process(line:String) ...
def receive = {
case BeginProcessing => reader ! LineRequest
case Some(line) => {
process(line)
reader ! LineRequest
}
case None => self.stop
}
}
val reader = actorOf[FileReader].start
val workers = Vector.fill(4)(actorOf(new Worker(reader)).start)
workers.foreach{_ ! BeginProcessing}
//wait for the workers to stop...
Run Code Online (Sandbox Code Playgroud)
这样,一次不超过4个(或者你有多少工人)未处理的行在内存中.
| 归档时间: |
|
| 查看次数: |
7673 次 |
| 最近记录: |