尽管失败,如何继续执行Future序列?

mis*_*tor 22 concurrency scala future

traverse来自Future对象的方法在第一次失败时停止.我想要这种方法的宽容/宽容版本,在出现错误时继续执行序列的其余部分.

目前我们已经为我们的utils添加了以下方法:

def traverseFilteringErrors[A, B <: AnyRef]
                           (seq: Seq[A])
                           (f: A => Future[B]): Future[Seq[B]] = {
  val sentinelValue = null.asInstanceOf[B]
  val allResults = Future.traverse(seq) { x =>
    f(x) recover { case _ => sentinelValue }
  }
  val successfulResults = allResults map { result =>
    result.filterNot(_ == sentinelValue)
  }
  successfulResults
}
Run Code Online (Sandbox Code Playgroud)

有一个更好的方法吗?

Rég*_*les 23

一般来说,一个真正有用的东西是能够将未来的错误推向一个适当的价值.或者换句话说,将a Future[T]转换为a Future[Try[T]](成功的返回值变为a Success[T],故障情况变为a Failure[T]).以下是我们如何实现它:

// Can also be done more concisely (but less efficiently) as:
// f.map(Success(_)).recover{ case t: Throwable => Failure( t ) }
// NOTE: you might also want to move this into an enrichment class
def mapValue[T]( f: Future[T] ): Future[Try[T]] = {
  val prom = Promise[Try[T]]()
  f onComplete prom.success
  prom.future
}
Run Code Online (Sandbox Code Playgroud)

现在,如果您执行以下操作:

Future.traverse(seq)( f andThen mapValue )
Run Code Online (Sandbox Code Playgroud)

您将获得一个成功的Future[Seq[Try[A]]],其最终值包含Success每个成功未来的Failure实例,以及每个失败的未来的实例.如果需要,您可以collect在此seq上使用以删除Failure实例并仅保留成功的值.

换句话说,您可以按如下方式重写助手方法:

def traverseFilteringErrors[A, B](seq: Seq[A])(f: A => Future[B]): Future[Seq[B]] = {
  Future.traverse( seq )( f andThen mapValue ) map ( _ collect{ case Success( x ) => x } )
}
Run Code Online (Sandbox Code Playgroud)

  • 因为`map`以及`recover`创建了'Promise`的实例并返回它们相关的`Future`.这是两个"承诺"实例,而不仅仅是我的替代解决方案.此外,"简洁"版本打开`Try`实例(存储在`Promise`实例中)只是为了将它重新包装为`Success`或`Failure`实例,这有点浪费.当然,您需要进行分析以确定它是否确实产生了任何有意义的差异. (6认同)
  • 凉.你能否评论为什么替代方案(`f.map(Success ...`)效率较低? (2认同)