par*_*tic 7 scala future actor
我试图使用分而治之(aka fork/join)方法来解决数字运算问题.这是代码:
import scala.actors.Futures.future
private def compute( input: Input ):Result = {
if( pairs.size < SIZE_LIMIT ) {
computeSequential()
} else {
val (input1,input2) = input.split
val f1 = future( compute(input1) )
val f2 = future( compute(input2) )
val result1 = f1()
val result2 = f2()
merge(result1,result2)
}
}
Run Code Online (Sandbox Code Playgroud)
它运行(具有良好的加速)但未来的apply方法似乎阻塞了一个线程,并且线程池大大增加.并且当创建太多线程时,计算被卡住.
是否存在一种释放线程的期货反应方法?或任何其他方式来实现这种行为?
编辑:我使用scala 2.8.0.final
不要求(申请)你的Futures,因为这迫使他们阻止并等待答案; 正如你所见,这可能会导致死锁.相反,单独使用它们告诉他们完成后该怎么做.代替:
val result1 = f1()
val result2 = f2()
merge(result1,result2)
Run Code Online (Sandbox Code Playgroud)
试试这个:
for {
result1 <- f1
result2 <- f2
} yield merge(result1, result2)
Run Code Online (Sandbox Code Playgroud)
结果将是Responder[Result](基本上是Future[Result])包含合并结果; 你可以使用respond()或者用这个最终值做一些有效的事情foreach(),或者你可以map()或者flatMap()用另一个来做Responder[T].不需要阻止,只需保持计划未来的计算!
好的,compute函数的签名必须改为Responder[Result]现在,那么这对递归调用有何影响?我们试试这个:
private def compute( input: Input ):Responder[Result] = {
if( pairs.size < SIZE_LIMIT ) {
future(computeSequential())
} else {
val (input1,input2) = input.split
for {
result1 <- compute(input1)
result2 <- compute(input2)
} yield merge(result1, result2)
}
}
Run Code Online (Sandbox Code Playgroud)
现在你不再需要将调用包装为computewith,future(...)因为它们已经返回Responder(超类Future).
使用这种延续传递方式的一个结果是,您的顶级代码 - 无论是compute最初的调用- 都不再阻止.如果它被调用main(),并且这是所有程序所做的,这将是一个问题,因为现在它只会产生一堆未来,然后立即关闭,完成它被告知要做的一切.你需要做的就是block所有这些未来,但只有一次,在顶级,而且只在所有计算的结果,而不是任何中间的计算结果.
不幸的是,这个Responder被返回的东西compute()不再有apply()像阻止方法一样的阻塞方法Future.我不确定为什么flatMapping Future会生成泛型Responder而不是a Future; 这似乎是一个API错误.但无论如何,你应该能够自己创造:
def claim[A](r:Responder[A]):A = {
import java.util.concurrent.ArrayBlockingQueue
import scala.actors.Actor.actor
val q = new ArrayBlockingQueue[A](1)
// uses of 'respond' need to be wrapped in an actor or future block
actor { r.respond(a => q.put(a)) }
return q.take
}
Run Code Online (Sandbox Code Playgroud)
所以现在你可以在你的main方法中创建一个阻塞调用来计算:
val finalResult = claim(compute(input))
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
701 次 |
| 最近记录: |