未来递归模式/未来任意长度的链接

Nig*_*olf 4 recursion scala future akka

我很好奇递归建立一个将按顺序运行的Akka期货链的最佳方式,如果doWork未来的调用失败,未来应该重试3次,如果重试尝试用完,链条应该会失败.假设所有doWork呼叫都通过了返回的未来futChain应该只完成.

object Main extends App {
  val futChain = recurse(2)

  def recurse(param: Int, retries: Int = 3): Future[String] {
    Future {
      doWorkThatMayFailReturningString(param...)
    } recoverWith {
      case e => 
        if (retries > 0) recurse(param, retries -1)
        else  Future.failed(e)
    } flatMap {
      strRes => recurse(nextParam) //how should the res from the previous fut be passed?
    }
  }

  futChain onComplete {
    case res => println(res) //should print all the strings
  }
}
Run Code Online (Sandbox Code Playgroud)
  1. 如何将结果作为集合获得?即在这个例子中每次StringdoWork函数返回(我需要以某种方式修改recursefunc以返回一个Futrue[List[String]]
  2. 我应该使用recoverrecoverWith
  3. 是否可以调用flatMap链接这些调用
  4. 我应该考虑尾递归和堆栈溢出吗?
  5. 我会更好地递归建立一个期货清单并减少它们吗?

Mic*_*jac 12

您可以Future像这样实现可重试:

def retry[T](f: => Future[T])(n: Int)(implicit e: ExecutionContext): Future[T] = {
    n match {
        case i if (i > 1) => f.recoverWith{ case t: Throwable => retry(f)(n - 1)}
        case _ => f
    }       
}
Run Code Online (Sandbox Code Playgroud)

这不是针对尾递归进行优化的,但是如果你只打算重试几次,你就不会得到堆栈溢出(我想如果它在前几个失败了,它会继续失败,无论如何).

然后我会分开做链接.如果您将有限数量的函数链接在一起,每个函数都取决于之前的(并且由于某种原因,您希望聚合结果),您可以使用for理解(语法糖flatMap):

for {
    firstResult <- retry(Future(doWork(param)))(3)
    secondResult <- retry(Future(doWork(firstResult)))(3)
    thirdResult <- retry(Future(doWork(secondResult)))(3)
} yield List(firstResult, secondResult, thirdResult)
Run Code Online (Sandbox Code Playgroud)

对于任意长链,您可以使用Future.sequence(Futures在Akka库中)并行执行:

def doWork(param: String): String = ...

val parameters: List[String] = List(...)

val results: Future[List[String]] = Future.sequence(parameters.map(doWork(_)))
Run Code Online (Sandbox Code Playgroud)

这将解开,否则这将是List[Future[String]]Future[List[String]].

这是按顺序执行类似操作的一种方法:

def sequential[A, B](seq: List[A])(f: A => Future[B])(implicit e: ExecutionContext): Future[List[B]] = {
    seq.foldLeft(Future.successful(List[B]())) { case (left, next) =>
        left.flatMap(list => f(next).map(_ :: list))
    }
}

def doWork(param: String): String = ...

val results: Future[List[String]] = sequential(parameters)(param => Future(doWork(param))) 
Run Code Online (Sandbox Code Playgroud)

这些函数的实现对您的用例非常敏感.如果链中的任何期货失败,上述两个函数将返回失败的期货.有时候你会想要这个,有时则没有.如果您只想收集成功的期货,并在不使整个结果失败的情况下丢弃失败的期货,您可以添加额外的步骤来恢复失败.

另外,之间的差recover并且recoverWith是类型的PartialFunction它接受.recover用默认值替换失败的期货,而recoverWith使用另一个替换失败的期货Future.在我的情况下retry,recoverWith更合适,因为我试图恢复失败Future的自己.