取消与Scala的未来和承诺

Mic*_*ael 12 scala future promise akka cancellation

这是我上一个问题的后续内容.

假设我有一个执行可中断阻塞调用的任务.我想将其作为a运行Future并使用方法取消它.failurePromise

我希望取消工作如下:

  • 如果一个人在完成任务之前取消了该任务,我希望任务"立即"完成,如果已经启动了阻止调用,我将Future要调用阻塞调用onFailure.

  • 如果在任务完成取消任务,我希望获得一个状态,说明由于任务已经完成,取消失败.

是否有意义?是否可以在Scala中实现?有这样的实现的例子吗?

Vik*_*ang 12

scala.concurrent.Future是只读的,因此一个读者不能为其他读者搞砸.

您似乎应该能够实现您想要的内容,如下所示:

def cancellableFuture[T](fun: Future[T] => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
  val p = Promise[T]()
  val f = p.future
  p tryCompleteWith Future(fun(f))
  (f, () => p.tryFailure(new CancellationException))
}

val (f, cancel) = cancellableFuture( future => {
  while(!future.isCompleted) continueCalculation // isCompleted acts as our interrupted-flag

  result  // when we're done, return some result
})

val wasCancelled = cancel() // cancels the Future (sets its result to be a CancellationException conditionally)
Run Code Online (Sandbox Code Playgroud)

  • 您必须添加一个synchronized var,它在计算开始时将当前线程设置为锁定,然后在结束时获取锁定并清除var.取消将在设置的线程上采取锁定和调用中断(如果有的话),或者如果为空则挽救. (2认同)

sou*_*ica 10

根据他的评论,这是Victor的代码的可中断版本(Victor,如果我误解,请纠正我).

object CancellableFuture extends App {

  def interruptableFuture[T](fun: () => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
    val p = Promise[T]()
    val f = p.future
    val aref = new AtomicReference[Thread](null)
    p tryCompleteWith Future {
      val thread = Thread.currentThread
      aref.synchronized { aref.set(thread) }
      try fun() finally {
        val wasInterrupted = (aref.synchronized { aref getAndSet null }) ne thread
        //Deal with interrupted flag of this thread in desired
      }
    }

    (f, () => {
      aref.synchronized { Option(aref getAndSet null) foreach { _.interrupt() } }
      p.tryFailure(new CancellationException)
    })
  }

  val (f, cancel) = interruptableFuture[Int] { () =>
    val latch = new CountDownLatch(1)

    latch.await(5, TimeUnit.SECONDS)    // Blocks for 5 sec, is interruptable
    println("latch timed out")

    42  // Completed
  }

  f.onFailure { case ex => println(ex.getClass) }
  f.onSuccess { case i => println(i) }

  Thread.sleep(6000)   // Set to less than 5000 to cancel

  val wasCancelled = cancel()

  println("wasCancelled: " + wasCancelled)
}
Run Code Online (Sandbox Code Playgroud)

随着Thread.sleep(6000)输出为:

latch timed out
42
wasCancelled: false
Run Code Online (Sandbox Code Playgroud)

随着Thread.sleep(1000)输出为:

wasCancelled: true
class java.util.concurrent.CancellationException
Run Code Online (Sandbox Code Playgroud)


gzm*_*zm0 5

Twitter的期货实施取消.看看这里:

https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala

第563行显示了对此负责的抽象方法.斯卡拉的期货目前不支持取消.