我想Source在它上面创建一个和后来的推送元素,如:
val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
Run Code Online (Sandbox Code Playgroud)
建议的方法是什么?
谢谢!
我有一个服务(让我们称之为服务A),它使用Akka Server HTTP来处理传入的请求.我还有第三方应用程序(服务B),它提供了几个Web服务.服务A的目的是转换客户端请求,调用服务B的一个或多个Web服务,合并/转换结果并将其提供给客户端.
我在某些部分使用Actors,而在其他部分使用Future.要拨打服务B,我使用Akka HTTP客户端.
Http.get(actorSystem).singleRequest(HttpRequest.create()
.withUri("http://127.0.0.1:8082/test"), materializer)
.onComplete(...)
Run Code Online (Sandbox Code Playgroud)
问题是,每个Service A请求都会创建一个新流,如果有多个并发连接,则会产生 akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error
我已经问过这个问题,并建议使用单个Flow 如何正确调用Akka HTTP客户端以获取多个(10k-100k)请求?
虽然它适用于来自单个地方的一批请求,但我不知道如何使用来自所有并发请求处理程序的单个Flow.
这样做的正确"Akka-way"是什么?
我正在使用Akka 2.4.4并尝试从Apache HttpAsyncClient迁移(失败).
下面是我在项目中使用的代码的简化版本.
问题是,如果我向流发送超过1-3个请求,它就会挂起.经过6个小时的调试后,我甚至找不到问题.我没有看到异常,错误日志,事件Decider.没有 :)
我尝试将connection-timeout设置减少到1s,以为它可能正在等待来自服务器的响应,但它没有帮助.
我究竟做错了什么 ?
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory
import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try
object Main {
implicit val system = ActorSystem("root")
implicit val executor = system.dispatcher
val config = ConfigFactory.load()
private val baseDomain = "www.google.com"
private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))
private val decider: Decider = { …Run Code Online (Sandbox Code Playgroud)