如何在Scala中取消未来?

Mic*_*ael 53 multithreading scala future

Java Future有一个cancel方法,它可以中断运行Future任务的线程.例如,如果我将一个可中断的阻塞调用包装在一个Java Future我可以稍后中断它.

Scala Future没有提供任何cancel方法.假设我在一个中包含一个可中断的阻塞调用Scala Future.我怎么能打断它?

axe*_*l22 30

这还不是FutureAPI 的一部分,但可能会在未来作为扩展添加.

作为一种解决方法,您可以使用firstCompletedOf包装2期货 - 您要取消的未来以及来自自定义的未来Promise.然后你可以通过失败承诺取消这样创建的未来:

def cancellable[T](f: Future[T])(customCode: => Unit): (() => Unit, Future[T]) = {
  val p = Promise[T]
  val first = Future firstCompletedOf Seq(p.future, f)
  val cancellation: () => Unit = {
    () =>
      first onFailure { case e => customCode}
      p failure new Exception
  }
  (cancellation, first)
}
Run Code Online (Sandbox Code Playgroud)

现在你可以在任何将来调用它来获得"可取消的包装器".用例示例:

val f = callReturningAFuture()
val (cancel, f1) = cancellable(f) {
  cancelTheCallReturningAFuture()
}

// somewhere else in code
if (condition) cancel() else println(Await.result(f1))
Run Code Online (Sandbox Code Playgroud)

编辑:

有关取消的详细讨论,请参阅Scala中的学习并发编程一书中的第4章.

  • 由于讨论的原因,提供"Future.cancel"不是一个好主意; 另一个问题是Future是一个共享的只读句柄,因此它不应该提供干扰其他读者的方法.你可以做的是将Future传递给你想要运行的代码并让代码定期检查Future,然后你有一个通过完成相应的Promise来中断计算的原则方法. (9认同)
  • 不幸的是,这不是100%可靠.实际上,在你调用`cancel`和`customCode`实际停止未来主体的时间之间(比如说`customCode`设置一个布尔标志,由未来的身体检查以知道是否中止),什么都可能发生.特别是未来的身体可能会开始执行.最终的结果是:虽然"可取消"所返回的未来说它被取消了,期货的实体却被执行了.一旦未来的身体发挥任何副作用,那就是一个真正的问题.这使得使用您的代码实际上非常危险. (7认同)
  • Future.cancel会非常有用.我最近遇到了一个问题,我创建了连接到不同数据库的期货来收集统计数据.有些dbs存在挂断连接的问题,这使得未来永远运行.由于stat集合定期运行,所以猜猜看,我的应用程序最终吃掉了所有可用的cpu.我意识到了这个问题,并试图在数据库修复之前找到一种超时的方法,但到目前为止还没有找到好的答案:( (4认同)
  • 是的,Promises是scala中期货的生产终点,因此您可以控制结果.一些链接:http://www.scala-lang.org/api/current/index.html#scala.concurrent.Promise,http://docs.scala-lang.org/overviews/core/futures.html#promises (3认同)

nig*_*ale 9

我没有对此进行测试,但这扩展了PabloFranciscoPérezHidalgo的答案.而不是阻止等待java Future,我们使用中间Promise代替.

import java.util.concurrent.{Callable, FutureTask}
import scala.concurrent.{ExecutionContext, Promise}
import scala.util.Try

class Cancellable[T](executionContext: ExecutionContext, todo: => T) {
  private val promise = Promise[T]()

  def future = promise.future

  private val jf: FutureTask[T] = new FutureTask[T](
    new Callable[T] {
      override def call(): T = todo
    }
  ) {
    override def done() = promise.complete(Try(get()))
  }

  def cancel(): Unit = jf.cancel(true)

  executionContext.execute(jf)
}

object Cancellable {
  def apply[T](todo: => T)(implicit executionContext: ExecutionContext): Cancellable[T] =
    new Cancellable[T](executionContext, todo)
}
Run Code Online (Sandbox Code Playgroud)


Geo*_*los 8

通过取消我猜你想暴力中断future.

找到了这段代码:https://gist.github.com/viktorklang/5409467

做了一些测试,似乎工作正常!

请享用 :)