Mic*_*ael 22 concurrency scala future
假设我有一个函数,它调用阻塞可中断操作.我想以超时的方式异步运行它.也就是说,我想在超时到期时中断该功能.
所以我想尝试这样的事情:
import scala.util.Try import scala.concurrent.Future def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = { val aref = new java.util.concurrent.atomic.AtomicReference[Thread]() import ExecutionContext.Implicits.global Future {Thread.sleep(timeout); aref.get().interrupt} // 1 Future {aref.set(Thread.currentThread); Try(f())} // 2 }
问题是aref
在(1)中可以为null,因为(2)尚未将其设置为当前线程.在这种情况下,我想等到aref
设定.最好的方法是什么?
fla*_*ian 16
您可以使用Await稍微简单一些.该Await.result
方法将超时持续时间作为第二个参数,并抛出一个TimeoutException
on timeout.
try {
import scala.concurrent.duration._
Await.result(aref, 10 seconds);
} catch {
case e: TimeoutException => // whatever you want to do.
}
Run Code Online (Sandbox Code Playgroud)
我也需要相同的行为,所以这就是我解决它的方式.我基本上创建了一个创建计时器的对象,如果未来在指定的持续时间内未完成,则会使用TimeoutException失败.
package mypackage
import scala.concurrent.{Promise, Future}
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import scala.concurrent.ExecutionContext.Implicits.global
object TimeoutFuture {
val actorSystem = ActorSystem("myActorSystem")
def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {
val promise = Promise[A]()
actorSystem.scheduler.scheduleOnce(timeout) {
promise tryFailure new java.util.concurrent.TimeoutException
}
Future {
try {
promise success block
}
catch {
case e:Throwable => promise failure e
}
}
promise.future
}
}
Run Code Online (Sandbox Code Playgroud)
如果添加一个CountDownLatch
,就可以实现所需的行为.(请注意,await
在很多很多Future
s 中阻塞(即卡住)可能会导致线程池的饥饿.)
import scala.util.Try
import scala.concurrent.Future
def launch(f: () => Unit, timeout: Int): Future[Try[Unit]] = {
val aref = new java.util.concurrent.atomic.AtomicReference[Thread]()
val cdl = new java.util.concurrent.CountDownLatch(1)
import ExecutionContext.Implicits.global
Future {Thread.sleep(timeout); cdl.await(); aref.get().interrupt} // 1
Future {aref.set(Thread.currentThread); cdl.countDown(); Try(f())} // 2
}
Run Code Online (Sandbox Code Playgroud)