并行运行多个期货,在超时时返回默认值

Zot*_*tov 14 concurrency scala future

我必须并行运行多个期货,程序不应该崩溃或挂起.

现在我一个接一个地等待期货,如果有TimeoutException则使用后备值.

val future1 = // start future1
val future2 = // start future2
val future3 = // start future3

// <- at this point all 3 futures are running

// waits for maximum of timeout1 seconds
val res1 = toFallback(future1, timeout1, Map[String, Int]())
// .. timeout2 seconds 
val res2 = toFallback(future2, timeout2, List[Int]())
// ... timeout3 seconds
val res3 = toFallback(future3, timeout3, Map[String, BigInt]()) 

def toFallback[T](f: Future[T], to: Int, default: T) = {
  Try(Await.result(f, to seconds))
    .recover { case to: TimeoutException => default }
}
Run Code Online (Sandbox Code Playgroud)

我可以看到,这个片段的最长等待时间是 timeout1 + timeout2 + timeout3

我的问题是:如何立即等待所有这些期货,这样我可以减少等待时间max(timeout1, timeout2, timeout3)

编辑:最后我使用了@Jatin和@senia答案的修改:

private def composeWaitingFuture[T](fut: Future[T], 
                                    timeout: Int, default: T) =
  future { Await.result(fut, timeout seconds) } recover {
    case e: Exception => default
  }
Run Code Online (Sandbox Code Playgroud)

后来使用如下:

// starts futures immediately and waits for maximum of timeoutX seconds
val res1 = composeWaitingFuture(future1, timeout1, Map[String, Int]())
val res2 = composeWaitingFuture(future2, timeout2, List[Int]())
val res3 = composeWaitingFuture(future3, timeout3, Map[String, BigInt]()) 

// takes the maximum of max(timeout1, timeout2, timeout3) to complete
val combinedFuture =
  for {
    r1 <- res1
    r2 <- res2
    r3 <- res3
  } yield (r1, r2, r3)
Run Code Online (Sandbox Code Playgroud)

后来我用combinedFuture我认为合适的方式.

sen*_*nia 13

您可以创建future使用flatMap或for -reherehement返回所有3个期货的结果:

val combinedFuture =
  for {
    r1 <- future1
    r2 <- future2
    r3 <- future3
  } yield (r1, r2, r3)

val (r1, r2, r3) = Await.result(combinedFuture , Seq(timeout1, timeout2, timeout3).max)
Run Code Online (Sandbox Code Playgroud)

如果您正在使用,akka您可以在超时后使用默认值完成未来:

implicit class FutureHelper[T](f: Future[T]) extends AnyVal{
  import akka.pattern.after
  def orDefault(t: Timeout, default: => T)(implicit system: ActorSystem): Future[T] = {
    val delayed = after(t.duration, system.scheduler)(Future.successful(default))
    Future firstCompletedOf Seq(f, delayed)
  }
}

val combinedFuture =
  for {
    r1 <- future1.orDefault(timeout1, Map())
    r2 <- future2.orDefault(timeout2, List())
    r3 <- future3.orDefault(timeout3, Map())
  } yield (r1, r2, r3)

val (r1, r2, r3) = Await.result(combinedFuture , allowance + Seq(timeout1, timeout2, timeout3).max)
Run Code Online (Sandbox Code Playgroud)


Jat*_*tin 9

def toFallback[T](f: Future[T], to: Int, default: T) = {
  future{
  try{
        Await.result(f, to seconds)
   }catch{
        case e:TimeoutException => default
  }
 }
Run Code Online (Sandbox Code Playgroud)

您甚至可以使此块异步,并且每个请求都等待其最大时间.如果线程太多,可能只有一个线程继续使用Akka检查其他期货system scheduler.@Senia已在下面回答了这个问题.

  • `Await.result`阻塞线程,因此你不应该在这里使用默认的`ExecutionContext`.您可以为`toFallback`的调用创建一个特殊的`ExecutionContext`,甚至可以像[this answer](http://stackoverflow.com/a/17215663/406435)那样启动一个新的线程而不是`future`方法. (6认同)