对期货做出反应

par*_*tic 7 scala future actor

我试图使用分而治之(aka fork/join)方法来解决数字运算问题.这是代码:

import scala.actors.Futures.future

private def compute( input: Input ):Result = {
  if( pairs.size < SIZE_LIMIT ) {
    computeSequential()
  } else {
    val (input1,input2) = input.split
    val f1 = future( compute(input1) )
    val f2 = future( compute(input2) )
    val result1 = f1()
    val result2 = f2()
    merge(result1,result2)
  }
}
Run Code Online (Sandbox Code Playgroud)

它运行(具有良好的加速)但未来的apply方法似乎阻塞了一个线程,并且线程池大大增加.并且当创建太多线程时,计算被卡住.

是否存在一种释放线程的期货反应方法?或任何其他方式来实现这种行为?

编辑:我使用scala 2.8.0.final

Tom*_*ett 8

不要求(申请)你的Futures,因为这迫使他们阻止并等待答案; 正如你所见,这可能会导致死锁.相反,单独使用它们告诉他们完成后该怎么做.代替:

val result1 = f1()
val result2 = f2()
merge(result1,result2)
Run Code Online (Sandbox Code Playgroud)

试试这个:

for {
  result1 <- f1
  result2 <- f2
} yield merge(result1, result2)
Run Code Online (Sandbox Code Playgroud)

结果将是Responder[Result](基本上是Future[Result])包含合并结果; 你可以使用respond()或者用这个最终值做一些有效的事情foreach(),或者你可以map()或者flatMap()用另一个来做Responder[T].不需要阻止,只需保持计划未来的计算!

编辑1:

好的,compute函数的签名必须改为Responder[Result]现在,那么这对递归调用有何影响?我们试试这个:

private def compute( input: Input ):Responder[Result] = {
  if( pairs.size < SIZE_LIMIT ) {
    future(computeSequential())
  } else {
    val (input1,input2) = input.split
    for {
      result1 <- compute(input1)
      result2 <- compute(input2)
    } yield merge(result1, result2)
  }
}
Run Code Online (Sandbox Code Playgroud)

现在你不再需要将调用包装为computewith,future(...)因为它们已经返回Responder(超类Future).

编辑2:

使用这种延续传递方式的一个结果是,您的顶级代码 - 无论是compute最初的调用- 都不再阻止.如果它被调用main(),并且这是所有程序所做的,这将是一个问题,因为现在它只会产生一堆未来,然后立即关闭,完成它被告知要做的一切.你需要做的就是block所有这些未来,但只有一次,在顶级,而且只在所有计算的结果,而不是任何中间的计算结果.

不幸的是,这个Responder被返回的东西compute()不再有apply()像阻止方法一样的阻塞方法Future.我不确定为什么flatMapping Future会生成泛型Responder而不是a Future; 这似乎是一个API错误.但无论如何,你应该能够自己创造:

def claim[A](r:Responder[A]):A = {
  import java.util.concurrent.ArrayBlockingQueue
  import scala.actors.Actor.actor

  val q = new ArrayBlockingQueue[A](1)
  // uses of 'respond' need to be wrapped in an actor or future block
  actor { r.respond(a => q.put(a)) } 
  return q.take
}
Run Code Online (Sandbox Code Playgroud)

所以现在你可以在你的main方法中创建一个阻塞调用来计算:

val finalResult = claim(compute(input))
Run Code Online (Sandbox Code Playgroud)