Ram*_*Ram 4 concurrency scala twitter-util
来自node.js背景,我是Scala的新手,我尝试使用Twitter的Future.collect来执行一些简单的并发操作.但我的代码显示顺序行为而不是并发行为.我究竟做错了什么?
这是我的代码,
import com.twitter.util.Future
def waitForSeconds(seconds: Int, container:String): Future[String] = Future[String] {
Thread.sleep(seconds*1000)
println(container + ": done waiting for " + seconds + " seconds")
container + " :done waiting for " + seconds + " seconds"
}
def mainFunction:String = {
val allTasks = Future.collect(Seq(waitForSeconds(1, "All"), waitForSeconds(3, "All"), waitForSeconds(2, "All")))
val singleTask = waitForSeconds(1, "Single")
allTasks onSuccess { res =>
println("All tasks succeeded with result " + res)
}
singleTask onSuccess { res =>
println("Single task succeeded with result " + res)
}
"Function Complete"
}
println(mainFunction)
Run Code Online (Sandbox Code Playgroud)
这是我得到的输出,
All: done waiting for 1 seconds
All: done waiting for 3 seconds
All: done waiting for 2 seconds
Single: done waiting for 1 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete
Run Code Online (Sandbox Code Playgroud)
我期望的输出是,
All: done waiting for 1 seconds
Single: done waiting for 1 seconds
All: done waiting for 2 seconds
All: done waiting for 3 seconds
All tasks succeeded with result ArraySeq(All :done waiting for 1 seconds, All :done waiting for 3 seconds, All :done waiting for 2 seconds)
Single task succeeded with result Single :done waiting for 1 seconds
Function Complete
Run Code Online (Sandbox Code Playgroud)
Twitter的期货比Scala标准库期货更明确地执行计算.特别是,Future.apply将安全地捕获异常(例如s.c.Future),但它没有说明计算将在哪个线程中运行.在您的情况下,计算在主线程中运行,这就是您看到结果的原因'看得见.
与标准库的未来API相比,这种方法有几个优点.首先,它使方法签名更简单,因为没有隐含的东西ExecutionContext必须在任何地方传递.更重要的是,它可以更容易地避免上下文切换(这是 Brian Degenhardt 的经典解释).在这方面,Twitter Future更像是Scalaz Task,并且具有基本相同的性能优势(例如在本博文中描述).
更明确地计算运行位置的缺点是你必须更明确地计算运行的位置.在你的情况下你可以写这样的东西:
import com.twitter.util.{ Future, FuturePool }
val pool = FuturePool.unboundedPool
def waitForSeconds(seconds: Int, container:String): Future[String] = pool {
Thread.sleep(seconds*1000)
println(container + ": done waiting for " + seconds + " seconds")
container + " :done waiting for " + seconds + " seconds"
}
Run Code Online (Sandbox Code Playgroud)
这不会产生正是你要求的输出("功能齐全"将首先打印,allTasks并且singleTask相对于彼此不排序),但它会在单独的线程并行运行的任务.
(作为一个注脚:在FuturePool.unboundedPool上面我举的例子是创建一个演示未来池的简单方法,而且往往只是罚款,但对于CPU密集型的计算,看看它是不是合适的FuturePoolAPI文档其他方式来创造未来的池,将使用ExecutorService您提供的并可以自行管理.)