你们能解释一下如何groupBy在 akka-streams 中使用 new吗?文档似乎非常无用。groupBy曾经回来(T, Source)但不再回来。这是我的示例(我从文档中模仿了一个):
Source(List(
1 -> "1a", 1 -> "1b", 1 -> "1c",
2 -> "2a", 2 -> "2b",
3 -> "3a", 3 -> "3b", 3 -> "3c",
4 -> "4a",
5 -> "5a", 5 -> "5b", 5 -> "5c",
6 -> "6a", 6 -> "6b",
7 -> "7a",
8 -> "8a", 8 -> "8b",
9 -> "9a", 9 -> "9b",
))
.groupBy(3, _._1)
.map { case (aid, raw) =>
aid -> List(raw)
}
.reduce[(Int, List[String])] { case (l: (Int, List[String]), r: (Int, List[String])) =>
(l._1, l._2 ::: r._2)
}
.mergeSubstreams
.runForeach { case (aid: Int, items: List[String]) =>
println(s"$aid - ${items.length}")
}
Run Code Online (Sandbox Code Playgroud)
这只是挂起。也许它挂起是因为子流的数量低于唯一键的数量。但是如果我有无限流怎么办?我想分组直到关键变化。
在我的真实流中,数据总是按我分组的值排序。也许我根本不需要groupBy?
一年后,Akka Stream Contrib有一个AccumulateWhileUnchanged类可以做到这一点:
libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.9"
Run Code Online (Sandbox Code Playgroud)
和:
import akka.stream.contrib.AccumulateWhileUnchanged
source.via(new AccumulateWhileUnchanged(_._1))
Run Code Online (Sandbox Code Playgroud)