如何在Scala中对Future进行投票?

use*_*992 8 asynchronous scala future polling akka

我想轮询一个API端点,直到达到某种条件。我希望它能在几秒钟到一分钟内达到此条件。我有一个方法来调用返回一个的端点Future。我是否可以Future通过某种方法将s链接在一起以每n毫秒轮询一次此端点并在t尝试后放弃?

假设我有一个带有以下签名的函数:

def isComplete(): Future[Boolean] = ???
Run Code Online (Sandbox Code Playgroud)

我认为最简单的方法是使所有内容都阻塞:

def untilComplete(): Unit = {
  for { _ <- 0 to 10 } {
    val status = Await.result(isComplete(), 1.seconds)
    if (status) return Unit
    Thread.sleep(100)
  }
  throw new Error("Max attempts")
}
Run Code Online (Sandbox Code Playgroud)

但这可能会占用所有线程,并且不是异步的。我还考虑了递归执行:

def untilComplete(
    f: Future[Boolean] = Future.successful(false),
    attempts: Int = 10
  ): Future[Unit] = f flatMap { status =>
    if (status) Future.successful(Unit)
    else if (attempts == 0) throw new Error("Max attempts")
    else {
      Thread.sleep(100)
      untilComplete(isComplete(), attempts - 1)
    }
}
Run Code Online (Sandbox Code Playgroud)

但是,我担心最大化调用堆栈,因为这不是尾递归。

有更好的方法吗?

编辑:我正在使用akka

Jef*_*ung 5

您可以使用Akka Streams。例如,isComplete每500毫秒调用一次,直到的结果Future为真为止,最多调用五次:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import scala.concurrent.Future
import scala.concurrent.duration._

def isComplete(): Future[Boolean] = ???

implicit val system = ActorSystem("MyExample")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

val stream: Future[Option[Boolean]] =
  Source(1 to 5)
    .throttle(1, 500 millis)
    .mapAsync(parallelism = 1)(_ => isComplete())
    .takeWhile(_ == false, true)
    .runWith(Sink.lastOption)

stream onComplete { result =>
  println(s"Stream completed with result: $result")
  system.terminate()
}
Run Code Online (Sandbox Code Playgroud)