Scala Futures - 内置超时?

Lal*_*lin 53 concurrency scala

从官方教程参考中我还没有完全理解未来的一个方面.http://docs.scala-lang.org/overviews/core/futures.html

scala中的期货是否有某种内置的超时机制?假设下面的示例是一个5千兆字节的文本文件......"Implicits.global"的隐含范围是否会导致onFailure以非阻塞方式触发或者是否可以定义?没有某种默认的超时时间,这是否意味着它既不会成功也不会失败?

import scala.concurrent._
import ExecutionContext.Implicits.global

val firstOccurence: Future[Int] = future {
  val source = scala.io.Source.fromFile("myText.txt")
  source.toSeq.indexOfSlice("myKeyword")
}
firstOccurence onSuccess {
  case idx => println("The keyword first appears at position: " + idx)
}
firstOccurence onFailure {
  case t => println("Could not process file: " + t.getMessage)
}
Run Code Online (Sandbox Code Playgroud)

cmb*_*ter 67

当您使用阻止来获取结果时,您只会获得超时行为Future.如果要使用非阻塞回调onComplete,onSuccess或者onFailure,则必须滚动自己的超时处理.Akka内置了针对?actor之间的请求/响应()消息传递的超时处理,但不确定是否要开始使用Akka.FWIW,在Akka中,用于超时处理,它们组成两个Futures通道Future.firstCompletedOf,一个代表实际的异步任务,另一个代表超时.如果超时计时器(通过a HashedWheelTimer)首先弹出,则异步回调会出现故障.

滚动自己的一个非常简单的例子可能是这样的.首先,一个用于调度超时的对象:

import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException

object TimeoutScheduler{
  val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
  def scheduleTimeout(promise:Promise[_], after:Duration) = {
    timer.newTimeout(new TimerTask{
      def run(timeout:Timeout){              
        promise.failure(new TimeoutException("Operation timed out after " + after.toMillis + " millis"))        
      }
    }, after.toNanos, TimeUnit.NANOSECONDS)
  }
}
Run Code Online (Sandbox Code Playgroud)

然后是一个函数来获取Future并为其添加超时行为:

import scala.concurrent.{Future, ExecutionContext, Promise}
import scala.concurrent.duration.Duration

def withTimeout[T](fut:Future[T])(implicit ec:ExecutionContext, after:Duration) = {
  val prom = Promise[T]()
  val timeout = TimeoutScheduler.scheduleTimeout(prom, after)
  val combinedFut = Future.firstCompletedOf(List(fut, prom.future))
  fut onComplete{case result => timeout.cancel()}
  combinedFut
}
Run Code Online (Sandbox Code Playgroud)

请注意,HashedWheelTimer我在这里使用的是来自Netty.

  • 非常感谢你!你能提供一些关于如何处理期货的一般建议(事后).我正在阅读Akka,以及使用期货的Scala的各种HTTP包.似乎在某些时候为了使用Future,阻塞事件必须在那一刻发生或放弃这个过程..但许多教程似乎都专注于非阻塞调用,而不是在事后做任何实际操作? (3认同)
  • 如果使用Akka,它有一个方便的`akka.pattern.after`,可用于例如超时:http://doc.akka.io/docs/akka/2.4/scala/futures.html#After (2认同)

Pab*_*dez 23

我刚TimeoutFuture为同事创建了一个课程:

TimeoutFuture

package model

import scala.concurrent._
import scala.concurrent.duration._
import play.libs.Akka
import play.api.libs.concurrent.Execution.Implicits._

object TimeoutFuture {
  def apply[A](timeout: FiniteDuration)(block: => A): Future[A] = {

    val prom = promise[A]

    // timeout logic
    Akka.system.scheduler.scheduleOnce(timeout) {
      prom tryFailure new java.util.concurrent.TimeoutException
    }

    // business logic
    Future { 
      prom success block
    }

    prom.future
  } 
}
Run Code Online (Sandbox Code Playgroud)

用法

val future = TimeoutFuture(10 seconds) { 
  // do stuff here
}

future onComplete {
  case Success(stuff) => // use "stuff"
  case Failure(exception) => // catch exception (either TimeoutException or an exception inside the given block)
}
Run Code Online (Sandbox Code Playgroud)

笔记:

  • 假设玩!框架(但很容易适应)
  • 每一段代码都在相同的情况下运行,ExecutionContext这可能并不理想.


jus*_*nhj 19

所有这些答案都需要额外的依赖性.我决定使用java.util.Timer编写一个版本,这是一种在将来运行函数的有效方法,在这种情况下会触发超时.

博客文章在这里有更多细节

使用Scala的Promise,我们可以使用超时生成Future,如下所示:

package justinhj.concurrency

import java.util.concurrent.TimeoutException
import java.util.{Timer, TimerTask}

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.language.postfixOps

object FutureUtil {

  // All Future's that use futureWithTimeout will use the same Timer object
  // it is thread safe and scales to thousands of active timers
  // The true parameter ensures that timeout timers are daemon threads and do not stop
  // the program from shutting down

  val timer: Timer = new Timer(true)

  /**
    * Returns the result of the provided future within the given time or a timeout exception, whichever is first
    * This uses Java Timer which runs a single thread to handle all futureWithTimeouts and does not block like a
    * Thread.sleep would
    * @param future Caller passes a future to execute
    * @param timeout Time before we return a Timeout exception instead of future's outcome
    * @return Future[T]
    */
  def futureWithTimeout[T](future : Future[T], timeout : FiniteDuration)(implicit ec: ExecutionContext): Future[T] = {

    // Promise will be fulfilled with either the callers Future or the timer task if it times out
    val p = Promise[T]

    // and a Timer task to handle timing out

    val timerTask = new TimerTask() {
      def run() : Unit = {
            p.tryFailure(new TimeoutException())
        }
      }

    // Set the timeout to check in the future
    timer.schedule(timerTask, timeout.toMillis)

    future.map {
      a =>
        if(p.trySuccess(a)) {
          timerTask.cancel()
        }
    }
    .recover {
      case e: Exception =>
        if(p.tryFailure(e)) {
          timerTask.cancel()
        }
    }

    p.future
  }

}
Run Code Online (Sandbox Code Playgroud)


小智 5

Play框架包含Promise.timeout,因此您可以编写如下代码

private def get(): Future[Option[Boolean]] = {
  val timeoutFuture = Promise.timeout(None, Duration("1s"))
  val mayBeHaveData = Future{
    // do something
    Some(true)
  }

  // if timeout occurred then None will be result of method
  Future.firstCompletedOf(List(mayBeHaveData, timeoutFuture))
}
Run Code Online (Sandbox Code Playgroud)


Rau*_*aul 5

我很惊讶这在 Scala 中不是标准的。我的版本很短,没有依赖项

import scala.concurrent.Future

sealed class TimeoutException extends RuntimeException

object FutureTimeout {

  import scala.concurrent.ExecutionContext.Implicits.global

  implicit class FutureTimeoutLike[T](f: Future[T]) {
    def withTimeout(ms: Long): Future[T] = Future.firstCompletedOf(List(f, Future {
      Thread.sleep(ms)
      throw new TimeoutException
    }))

    lazy val withTimeout: Future[T] = withTimeout(2000) // default 2s timeout
  }

}
Run Code Online (Sandbox Code Playgroud)

使用示例

import FutureTimeout._
Future { /* do smth */ } withTimeout
Run Code Online (Sandbox Code Playgroud)

  • 您提到的解决方案使用 Akka 库,而后者又依赖于 Java 的 `Thread.sleep`,这是在 JVM 上暂停线程执行的最低级别方法。参考:https://github.com/akka/akka/blob/ebc39ef9ab4440b98fe38b4994f8641d48128ee8/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L236 (4认同)
  • @Raul 不同之处在于 Akka 对所有计划的超时使用一个调度线程,而此解决方案为每个调用创建一个休眠线程。 (4认同)

gal*_*arm 5

如果您希望作者(承诺持有者)成为控制超时逻辑的人,请按以下方式使用akka.pattern.after

val timeout = akka.pattern.after(10 seconds, system.scheduler)(Future.failed(new TimeoutException(s"timed out during...")))
Future.firstCompletedOf(Seq(promiseRef.future, timeout))
Run Code Online (Sandbox Code Playgroud)

这样,如果您的承诺完成逻辑永远不会发生,您的调用者的未来仍然会在某个时刻以失败的方式完成。