我们希望并行化流的一部分,像 mapAsync 那样保持排序,但没有 Future。
目前我们有以下解决方案,但这需要一个物化器,而普通的 flatMapConcat 则不需要。
def flatMapConcatParallel[In, Out](parallelism: Int)(f: In => Source[Out, _])(implicit mat: Materializer): Flow[In, Out, NotUsed] = {
// TODO there should be a better way to add parallelism that avoids a run (and the need for a materializer)
Flow[In].mapAsync(parallelism){i =>
f(i).runWith(Sink.head)
}
}
Run Code Online (Sandbox Code Playgroud)
和
if (parallel){
val parallelism = 4
Flow[Batch].via(flatMapConcatParallel(parallelism)(singleRun))
} else{
Flow[Batch].flatMapConcat(singleRun)
}
Run Code Online (Sandbox Code Playgroud)
关于如何使用现有构造实现这一点而不必进入低级(GraphStageLogic)有任何提示吗?
---> f() --->
--d-c-b-a--> OrderedBalance ---> f() ---> OrderedMerge --d'-c'-b'-a'--->
---> f() --->
Run Code Online (Sandbox Code Playgroud)
关于什么:
Flow[In].mapAsync(parallelism)(i => Future.successful(f(i))).flatMapConcat(identity)
Run Code Online (Sandbox Code Playgroud)