Scala中的异步IO与期货

F.X*_*.X. 67 io multithreading scala future

假设我从一些URL下载了一个(可能很大的)图像列表.我正在使用Scala,所以我要做的是:

import scala.actors.Futures._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val fimages: List[Future[...]] = urls.map (url => future { download url })

// Do something (display) when complete
fimages.foreach (_.foreach (display _))
Run Code Online (Sandbox Code Playgroud)

我对Scala有点新意,所以对我来说这看起来仍然有些神奇:

  • 这是正确的方法吗?任何替代品,如果不是?
  • 如果我要下载100个图像,这会一次创建100个线程,还是会使用线程池?
  • 最后一条指令(display _)是否会在主线程上执行,如果没有,我该如何确定呢?

谢谢你的建议!

Hea*_*ler 130

在Scala 2.10中使用Futures.他们是Scala团队,Akka团队和Twitter之间的联合工作,以实现更加标准化的未来API和跨框架使用的实现.我们刚刚发布了一份指南:http://docs.scala-lang.org/overviews/core/futures.html

除了完全非阻塞(默认情况下,尽管我们提供了管理阻塞操作的能力)和可组合之外,Scala的2.10期货还带有一个隐式线程池来执行您的任务,以及一些管理超时的实用程序.

import scala.concurrent.{future, blocking, Future, Await, ExecutionContext.Implicits.global}
import scala.concurrent.duration._

// Retrieve URLs from somewhere
val urls: List[String] = ...

// Download image (blocking operation)
val imagesFuts: List[Future[...]] = urls.map {
  url => future { blocking { download url } }
}

// Do something (display) when complete
val futImages: Future[List[...]] = Future.sequence(imagesFuts)
Await.result(futImages, 10 seconds).foreach(display)
Run Code Online (Sandbox Code Playgroud)

上面,我们首先导入一些东西:

  • future:用于创建未来的API.
  • blocking:用于托管阻止的API.
  • Future:未来的伴随对象,包含许多有用的期货收集方法.
  • Await:用于阻止未来的单例对象(将其结果传递给当前线程).
  • ExecutionContext.Implicits.global:默认的全局线程池,一个ForkJoin池.
  • duration._:用于管理超时持续时间的实用程序.

imagesFuts与你最初的做法大致相同 - 这里唯一的区别是我们使用托管阻止 - blocking.它通知线程池您传递给它的代码块包含长时间运行或阻塞操作.这允许池临时生成新工作程序,以确保永远不会发生所有工作程序被阻止的情况.这样做是为了防止阻塞应用程序中的饥饿(锁定线程池).请注意,线程池还知道托管阻塞块中的代码何时完成 - 因此它将在该点删除备用工作线程,这意味着池将缩减回其预期大小.

(如果您想绝对阻止创建其他线程,那么您应该使用AsyncIO库,例如Java的NIO库.)

然后,我们用未来的伴侣对象的集合方法,将imagesFuts来自 List[Future[...]]于一个Future[List[...]].

Await对象是我们如何能够确保display上thread--调用执行Await.result简单地强制当前线程等待,直到将来,它传递完成.(这在内部使用托管阻止.)

  • 为什么你在`url => future {blocking {download url}}`中使用阻塞,为什么不使用`url => future {download url}`? (4认同)
  • 实际上"阻塞"背后的东西是如何工作的?它是否有自己的线程池,或者只是在我们通过`blocking`提交任务时创建新线程? (2认同)

som*_*ytt 5

val all = Future.traverse(urls){ url =>
  val f = future(download url) /*(downloadContext)*/
  f.onComplete(display)(displayContext)
  f
}
Await.result(all, ...)
Run Code Online (Sandbox Code Playgroud)
  1. 在2.10中使用scala.concurrent.Future,现在是RC.
  2. 它使用隐式ExecutionContext
  3. 新的Future doc明确指出onComplete(和foreach)可以在值可用时立即进行评估.老演员Future做同样的事情.根据您的要求显示,您可以提供合适的ExecutionContext(例如,单个线程执行程序).如果您只是希望主线程等待加载完成,则遍历会为您提供等待的未来.