rel*_*mes 7 java akka akka-http
我有一个服务(让我们称之为服务A),它使用Akka Server HTTP来处理传入的请求.我还有第三方应用程序(服务B),它提供了几个Web服务.服务A的目的是转换客户端请求,调用服务B的一个或多个Web服务,合并/转换结果并将其提供给客户端.
我在某些部分使用Actors,而在其他部分使用Future.要拨打服务B,我使用Akka HTTP客户端.
Http.get(actorSystem).singleRequest(HttpRequest.create()
.withUri("http://127.0.0.1:8082/test"), materializer)
.onComplete(...)
Run Code Online (Sandbox Code Playgroud)
问题是,每个Service A请求都会创建一个新流,如果有多个并发连接,则会产生 akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error
我已经问过这个问题,并建议使用单个Flow 如何正确调用Akka HTTP客户端以获取多个(10k-100k)请求?
虽然它适用于来自单个地方的一批请求,但我不知道如何使用来自所有并发请求处理程序的单个Flow.
这样做的正确"Akka-way"是什么?
小智 13
我想你可以Source.queue用来缓冲你的请求.下面的代码假设您需要从第三方服务获得答案,因此Future[HttpResponse]非常欢迎.这样,您还可以提供溢出策略以防止资源不足.
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem("main")
implicit val materializer = ActorMaterializer()
val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80)
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew)
.via(pool)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = "/") -> promise
val response = queue.offer(request).flatMap(buffered => {
if (buffered) promise.future
else Future.failed(new RuntimeException())
})
Await.ready(response, 3 seconds)
Run Code Online (Sandbox Code Playgroud)
(从我的博文中复制的代码)
| 归档时间: |
|
| 查看次数: |
2628 次 |
| 最近记录: |