Sam*_*lez 5 scala stream akka-stream
我需要做一些与此非常类似的事情https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala
我的问题是我有一个未知数量的组,如果mapAsync的并行数少于我得到的组数和最后一个数组中的错误
由于上游错误(akka.stream.impl.StreamSubscriptionTimeoutSupport $$ anon $ 2)拆除SynchronousFileSink(/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt)
我尝试在akka流的模式指南中建议使用缓冲区http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html
groupBy {
case LoglevelPattern(level) => level
case other => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
// write lines of each group to a separate file
mapAsync(parallelism = 2) {....
Run Code Online (Sandbox Code Playgroud)
但结果相同
| 归档时间: |
|
| 查看次数: |
3656 次 |
| 最近记录: |