Akka 和 future 的同时请求太多

Nag*_*sha 5 parallel-processing scala future akka

我的场景是这样的。

  1. 我有一个由大量报告组成的向量,需要使用 REST API 调用发送。
  2. 我正在使用 Futures.traverse(1中提到的向量)
  3. 由于向量太大,因此会因超出最大打开请求数而失败。

我能想到的一个初步解决方案是增加 max-open-requests 设置。但这里的问题是我不知道需要提前发送多少报告。

有人可以建议一个替代解决方案,例如限制通过 Futures.traverse 发生的并行性

Rob*_*rMP 3

由于您用 标记这个问题akka,我假设您正在使用 akka-http 进行调用。您可以使用 akka-streams 批量发出请求,以避免连接溢出,例如:

Source(reportsVector)
  .grouped(safeValue)
  .mapAsync(1)(reps => Future.traverse(reps)(x => ...)) //do your stuff
  .mapConcat(identity)
  .runWith(Sink.seq)
Run Code Online (Sandbox Code Playgroud)

该示例将safeValue一次执行并发调用,并将所有结果收集到一个集合中,该集合将在整个流完成时返回。您还可以使用其他运算符,例如slidingsplitWhen,为了使其更好地适合您的用例,您还可以调整safeValue和并发值。mapAsync请注意,该流的源是已知向量 ( reportsVector),但它也可能是未知的有限报告流。