Scala中的并行文件处理

Mic*_*ael 6 scala actor akka

假设我需要并行处理给定文件夹中的文件.在Java中,我将创建一个FolderReader线程来读取文件夹和FileProcessor线程池中的文件名.FolderReader读取文件名并将文件处理函数(Runnable)提交给池执行程序.

在Scala中,我看到两个选项:

  • 创建一个FileProcessoractor 池并安排一个文件处理函数Actors.Scheduler.
  • 在读取文件名时为每个文件名创建一个actor.

是否有意义?什么是最好的选择?

Dav*_*ith 10

根据你正在做的事情,它可能很简单

for(file<-files.par){
   //process the file
}
Run Code Online (Sandbox Code Playgroud)


Edm*_*984 3

我建议尽全力远离线程。幸运的是,我们有更好的抽象来处理下面发生的事情,在你的情况下,在我看来,你不需要使用参与者(虽然你可以),但你可以使用一个更简单的抽象,称为期货。它们是 Akka 开源库的一部分,我认为将来也将成为 Scala 标准库的一部分。

Future[T] 只是将来会返回 T 的东西。

运行 future 所需的只是拥有一个隐式 ExecutionContext,您可以从 java 执行器服务派生它。然后你将能够享受优雅的 API 以及 future 是一个 monad 的事实,可以将集合转换为 future 集合,收集结果等等。我建议你看看http://doc.akka.io/docs/akka/2.0.1/scala/futures.html

object TestingFutures {
  implicit val executorService = Executors.newFixedThreadPool(20)
  implicit val executorContext = ExecutionContext.fromExecutorService(executorService)

  def testFutures(myList:List[String]):List[String]= {

    val listOfFutures : Future[List[String]] = Future.traverse(myList){
      aString => Future{
                        aString.reverse
                       }
     }
    val result:List[String] = Await.result(listOfFutures,1 minute)
    result

  }
}
Run Code Online (Sandbox Code Playgroud)

这里发生了很多事情:

  • 我使用Future.traverse它作为第一个参数接收,作为 M[T]<:Traversable[T]第二个参数 aT => Future[T]或者如果你更喜欢 aFunction1[T,Future[T]]并返回 Future[M[T]]
  • 我正在使用该Future.apply方法创建类型的匿名类Future[T]

关注 Akka future 还有很多其他原因。

  • Futures 可以被映射,因为它们是 monad,即你可以链接 Futures 执行:

    Future { 3 }.map { _ * 2 }.map { _.toString }

  • Futures 有回调:future.onComplete、onSuccess、onFailure 和Then 等。

  • Future不仅支持遍历,还支持理解