Scala中超时的未来

Mic*_*ael 22 concurrency scala future

假设我有一个函数,它调用阻塞可中断操作.我想以超时的方式异步运行它.也就是说,我想在超时到期时中断该功能.

所以我想尝试这样的事情:

import scala.util.Try
import scala.concurrent.Future

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() 

  import ExecutionContext.Implicits.global
  Future {Thread.sleep(timeout); aref.get().interrupt} // 1
  Future {aref.set(Thread.currentThread); Try(f())}    // 2
}

问题是aref在(1)中可以为null,因为(2)尚未将其设置为当前线程.在这种情况下,我想等到aref设定.最好的方法是什么?

fla*_*ian 16

您可以使用Await稍微简单一些.该Await.result方法将超时持续时间作为第二个参数,并抛出一个TimeoutExceptionon timeout.

try {
  import scala.concurrent.duration._
  Await.result(aref, 10 seconds);
} catch {
    case e: TimeoutException => // whatever you want to do.
}
Run Code Online (Sandbox Code Playgroud)

  • 我猜,它不会中断异步运行的函数. (6认同)
  • 您不应该在生产代码中使用Await.result.这是一个阻止功能 (4认同)

ans*_*ans 6

我也需要相同的行为,所以这就是我解决它的方式.我基本上创建了一个创建计时器的对象,如果未来在指定的持续时间内未完成,则会使用TimeoutException失败.

package mypackage

import scala.concurrent.{Promise, Future}
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import scala.concurrent.ExecutionContext.Implicits.global

object TimeoutFuture {

  val actorSystem = ActorSystem("myActorSystem")
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
    val promise = Promise[A]()
    actorSystem.scheduler.scheduleOnce(timeout) {
      promise tryFailure new java.util.concurrent.TimeoutException
    }

    Future {
      try {
        promise success block
      }
      catch {
        case e:Throwable => promise failure e
      } 
    }

    promise.future
  }
}
Run Code Online (Sandbox Code Playgroud)


Rex*_*err 5

如果添加一个CountDownLatch,就可以实现所需的行为.(请注意,await在很多很多Futures 中阻塞(即卡住)可能会导致线程池的饥饿.)

import scala.util.Try
import scala.concurrent.Future

def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {

  val aref = new java.util.concurrent.atomic.AtomicReference[Thread]()
  val cdl = new java.util.concurrent.CountDownLatch(1)

  import ExecutionContext.Implicits.global
  Future {Thread.sleep(timeout); cdl.await(); aref.get().interrupt}   // 1
  Future {aref.set(Thread.currentThread); cdl.countDown(); Try(f())}  // 2
}
Run Code Online (Sandbox Code Playgroud)