相关疑难解决方法(0)

如何创建一个可以通过方法调用稍后接收元素的Source?

我想Source在它上面创建一个和后来的推送元素,如:

val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
Run Code Online (Sandbox Code Playgroud)

建议的方法是什么?

谢谢!

scala akka akka-stream akka-http

50
推荐指数
2
解决办法
1万
查看次数

如何使用Akka HTTP从多个actor/web处理程序中正确调用单个服务器?

我有一个服务(让我们称之为服务A),它使用Akka Server HTTP来处理传入的请求.我还有第三方应用程序(服务B),它提供了几个Web服务.服务A的目的是转换客户端请求,调用服务B的一个或多个Web服务,合并/转换结果并将其提供给客户端.

我在某些部分使用Actors,而在其他部分使用Future.要拨打服务B,我使用Akka HTTP客户端.

Http.get(actorSystem).singleRequest(HttpRequest.create()
        .withUri("http://127.0.0.1:8082/test"), materializer)
        .onComplete(...)
Run Code Online (Sandbox Code Playgroud)

问题是,每个Service A请求都会创建一个新流,如果有多个并发连接,则会产生 akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error

我已经问过这个问题,并建议使用单个Flow 如何正确调用Akka HTTP客户端以获取多个(10k-100k)请求?

虽然它适用于来自单个地方的一批请求,但我不知道如何使用来自所有并发请求处理程序的单个Flow.

这样做的正确"Akka-way"是什么?

java akka akka-http

7
推荐指数
1
解决办法
2628
查看次数

通过连接池发出http请求时,Akka Flow会挂起

我正在使用Akka 2.4.4并尝试从Apache HttpAsyncClient迁移(失败).

下面是我在项目中使用的代码的简化版本.

问题是,如果我向流发送超过1-3个请求,它就会挂起.经过6个小时的调试后,我甚至找不到问题.我没有看到异常,错误日志,事件Decider.没有 :)

我尝试将connection-timeout设置减少到1s,以为它可能正在等待来自服务器的响应,但它没有帮助.

我究竟做错了什么 ?

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory

import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try

object Main {

  implicit val system = ActorSystem("root")
  implicit val executor = system.dispatcher
  val config = ConfigFactory.load()

  private val baseDomain = "www.google.com"
  private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))

  private val decider: Decider = { …
Run Code Online (Sandbox Code Playgroud)

scala akka akka-stream akka-http

7
推荐指数
1
解决办法
1140
查看次数

标签 统计

akka ×3

akka-http ×3

akka-stream ×2

scala ×2

java ×1