Scala,读取文件,处理行并使用并发(akka),异步API(nio2)将输出写入新文件

ale*_*exm 4 io concurrency file-io scala akka

1:我遇到了一个试图处理大文本文件的问题--10Gigs +

单线程解决方案如下:

val writer = new PrintWriter(new File(output.getOrElse("output.txt")));
for(line <- scala.io.Source.fromFile(file.getOrElse("data.txt")).getLines())
{
  writer.println(DigestUtils.HMAC_SHA_256(line))
}
writer.close()
Run Code Online (Sandbox Code Playgroud)

2:我尝试使用并发处理

val futures = scala.io.Source.fromFile(file.getOrElse("data.txt")).getLines
               .map{ s => Future{ DigestUtils.HMAC_SHA_256(s) } }.to
val results = futures.map{ Await.result(_, 10000 seconds) }
Run Code Online (Sandbox Code Playgroud)

这会导致GC开销限制超出异常(有关stacktrace,请参阅附录A)

3:我尝试使用Akka IO和AsynchronousFileChannel的组合以下https://github.com/drexin/akka-io-file我能够使用FileSlurp以字节块的形式读取文件但是找不到要读取的解决方案按行要求的文件.

任何帮助将不胜感激.谢谢.

附录A.

[error] (run-main) java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.nio.CharBuffer.wrap(Unknown Source)
        at sun.nio.cs.StreamDecoder.implRead(Unknown Source)
        at sun.nio.cs.StreamDecoder.read(Unknown Source)
        at java.io.InputStreamReader.read(Unknown Source)
        at java.io.BufferedReader.fill(Unknown Source)
        at java.io.BufferedReader.readLine(Unknown Source)
        at java.io.BufferedReader.readLine(Unknown Source)
        at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.s
cala:67)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:
48)
        at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:7
16)
        at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:6
92)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at com.test.Twitterhashconcurrentcli$.doConcurrent(Twitterhashconcu
rrentcli.scala:35)
        at com.test.Twitterhashconcurrentcli$delayedInit$body.apply(Twitter
hashconcurrentcli.scala:62)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:
12)
        at scala.App$$anonfun$main$1.apply(App.scala:71)
        at scala.App$$anonfun$main$1.apply(App.scala:71)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.generic.TraversableForwarder$class.foreach(Traversab
leForwarder.scala:32)
        at scala.App$class.main(App.scala:71)
Run Code Online (Sandbox Code Playgroud)

cmb*_*ter 14

这里的诀窍是避免将所有数据一次性读入内存.如果您迭代并向工作人员发送行,则会冒这个风险,因为发送给actor是异步的,因此您可能会将所有数据读入内存,并且它将位于actor的邮箱中,可能会导致OOM异常.更好的高级方法是使用单个主操作员和下面的子工作池来进行处理.这里的技巧是在主文件中使用一个惰性流(如Iterator返回的scala.io.Source.fromX),然后在工作程序中使用工作拉动模式,以防止他们的邮箱填满数据.然后,当迭代器不再有任何行时,主服务器会自行停止,这将停止工作者(如果需要,您也可以使用此点关闭actor系统,如果这是你真正想做的事情).

这是一个非常粗略的轮廓.请注意,我还没有测试过这个:

import akka.actor._
import akka.routing.RoundRobinLike
import akka.routing.RoundRobinRouter
import scala.io.Source
import akka.routing.Broadcast

object FileReadMaster{
  case class ProcessFile(filePath:String)
  case class ProcessLines(lines:List[String], last:Boolean = false)
  case class LinesProcessed(lines:List[String], last:Boolean = false)

  case object WorkAvailable
  case object GimmeeWork
}

class FileReadMaster extends Actor{
  import FileReadMaster._

  val workChunkSize = 10
  val workersCount = 10

  def receive = waitingToProcess

  def waitingToProcess:Receive = {
    case ProcessFile(path) =>
      val workers = (for(i <- 1 to workersCount) yield context.actorOf(Props[FileReadWorker])).toList
      val workersPool = context.actorOf(Props.empty.withRouter(RoundRobinRouter(routees = workers)))
      val it = Source.fromFile(path).getLines
      workersPool ! Broadcast(WorkAvailable)
      context.become(processing(it, workersPool, workers.size))

      //Setup deathwatch on all
      workers foreach (context watch _)
  }

  def processing(it:Iterator[String], workers:ActorRef, workersRunning:Int):Receive = {
    case ProcessFile(path) => 
      sender ! Status.Failure(new Exception("already processing!!!"))


    case GimmeeWork if it.hasNext =>
      val lines = List.fill(workChunkSize){
        if (it.hasNext) Some(it.next)
        else None
      }.flatten

      sender ! ProcessLines(lines, it.hasNext)

      //If no more lines, broadcast poison pill
      if (!it.hasNext) workers ! Broadcast(PoisonPill)

    case GimmeeWork =>
      //get here if no more work left

    case LinesProcessed(lines, last) =>
      //Do something with the lines

    //Termination for last worker
    case Terminated(ref)  if workersRunning == 1 =>
      //Done with all work, do what you gotta do when done here

    //Terminared for non-last worker
    case Terminated(ref) =>
      context.become(processing(it, workers, workersRunning - 1))

  }
}

class FileReadWorker extends Actor{
  import FileReadMaster._

  def receive = {
    case ProcessLines(lines, last) => 
      sender ! LinesProcessed(lines.map(_.reverse), last)
      sender ! GimmeeWork

    case WorkAvailable =>
      sender ! GimmeeWork
  }
}
Run Code Online (Sandbox Code Playgroud)

这个想法是主人迭代文件的内容并将一大堆工作发送给一个童工池.文件处理开始时,主人告诉所有孩子工作可用.然后每个孩子继续请求工作,直到不再有工作为止.当主人检测到文件被读完时,它会向孩子们播放一个毒丸,让他们完成任何未完成的工作,然后停止.当所有孩子都停下来时,主人可以完成所需的任何清理工作.

同样,根据我的想法,这是非常粗略的.如果我在任何地区离开,请告诉我,我可以修改答案.