如何使用akka流中的mapAsync来使用分组子流

Sam*_*lez 5 scala stream akka-stream

我需要做一些与此非常类似的事情https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala

我的问题是我有一个未知数量的组,如果mapAsync的并行数少于我得到的组数和最后一个数组中的错误

由于上游错误(akka.stream.impl.StreamSubscriptionTimeoutSupport $$ anon $ 2)拆除SynchronousFileSink(/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt)

我尝试在akka流的模式指南中建议使用缓冲区http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html

groupBy {
  case LoglevelPattern(level) => level
  case other                  => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
  // write lines of each group to a separate file
  mapAsync(parallelism = 2) {....
Run Code Online (Sandbox Code Playgroud)

但结果相同