use*_*992 8 asynchronous scala future polling akka
我想轮询一个API端点,直到达到某种条件。我希望它能在几秒钟到一分钟内达到此条件。我有一个方法来调用返回一个的端点Future
。我是否可以Future
通过某种方法将s链接在一起以每n
毫秒轮询一次此端点并在t
尝试后放弃?
假设我有一个带有以下签名的函数:
def isComplete(): Future[Boolean] = ???
Run Code Online (Sandbox Code Playgroud)
我认为最简单的方法是使所有内容都阻塞:
def untilComplete(): Unit = {
for { _ <- 0 to 10 } {
val status = Await.result(isComplete(), 1.seconds)
if (status) return Unit
Thread.sleep(100)
}
throw new Error("Max attempts")
}
Run Code Online (Sandbox Code Playgroud)
但这可能会占用所有线程,并且不是异步的。我还考虑了递归执行:
def untilComplete(
f: Future[Boolean] = Future.successful(false),
attempts: Int = 10
): Future[Unit] = f flatMap { status =>
if (status) Future.successful(Unit)
else if (attempts == 0) throw new Error("Max attempts")
else {
Thread.sleep(100)
untilComplete(isComplete(), attempts - 1)
}
}
Run Code Online (Sandbox Code Playgroud)
但是,我担心最大化调用堆栈,因为这不是尾递归。
有更好的方法吗?
编辑:我正在使用akka
您可以使用Akka Streams。例如,isComplete
每500毫秒调用一次,直到的结果Future
为真为止,最多调用五次:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import scala.concurrent.Future
import scala.concurrent.duration._
def isComplete(): Future[Boolean] = ???
implicit val system = ActorSystem("MyExample")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val stream: Future[Option[Boolean]] =
Source(1 to 5)
.throttle(1, 500 millis)
.mapAsync(parallelism = 1)(_ => isComplete())
.takeWhile(_ == false, true)
.runWith(Sink.lastOption)
stream onComplete { result =>
println(s"Stream completed with result: $result")
system.terminate()
}
Run Code Online (Sandbox Code Playgroud)