Nag*_*sha 5 parallel-processing scala future akka
我的场景是这样的。
我能想到的一个初步解决方案是增加 max-open-requests 设置。但这里的问题是我不知道需要提前发送多少报告。
有人可以建议一个替代解决方案,例如限制通过 Futures.traverse 发生的并行性
由于您用 标记这个问题akka,我假设您正在使用 akka-http 进行调用。您可以使用 akka-streams 批量发出请求,以避免连接溢出,例如:
Source(reportsVector)
.grouped(safeValue)
.mapAsync(1)(reps => Future.traverse(reps)(x => ...)) //do your stuff
.mapConcat(identity)
.runWith(Sink.seq)
Run Code Online (Sandbox Code Playgroud)
该示例将safeValue一次执行并发调用,并将所有结果收集到一个集合中,该集合将在整个流完成时返回。您还可以使用其他运算符,例如sliding和splitWhen,为了使其更好地适合您的用例,您还可以调整safeValue和并发值。mapAsync请注意,该流的源是已知向量 ( reportsVector),但它也可能是未知的有限报告流。
| 归档时间: |
|
| 查看次数: |
306 次 |
| 最近记录: |