如何使用 SubFlows 对已排序流的项目进行分组?

exp*_*ert 4 akka akka-stream

你们能解释一下如何groupBy在 akka-streams 中使用 new吗?文档似乎非常无用。groupBy曾经回来(T, Source)但不再回来。这是我的示例(我从文档中模仿了一个):

Source(List(
  1 -> "1a", 1 -> "1b", 1 -> "1c",
  2 -> "2a", 2 -> "2b",
  3 -> "3a", 3 -> "3b", 3 -> "3c",
  4 -> "4a",
  5 -> "5a", 5 -> "5b", 5 -> "5c",
  6 -> "6a", 6 -> "6b",
  7 -> "7a",
  8 -> "8a", 8 -> "8b",
  9 -> "9a", 9 -> "9b",
))
  .groupBy(3, _._1)
  .map { case (aid, raw) =>
    aid -> List(raw)
  }
  .reduce[(Int, List[String])] { case (l: (Int, List[String]), r: (Int, List[String])) =>
  (l._1, l._2 ::: r._2)
}
  .mergeSubstreams
  .runForeach { case (aid: Int, items: List[String]) =>
    println(s"$aid - ${items.length}")
  }
Run Code Online (Sandbox Code Playgroud)

这只是挂起。也许它挂起是因为子流的数量低于唯一键的数量。但是如果我有无限流怎么办?我想分组直到关键变化。

在我的真实流中,数据总是按我分组的值排序。也许我根本不需要groupBy

Yos*_*ssi 5

一年后,Akka Stream Contrib有一个AccumulateWhileUnchanged类可以做到这一点:

libraryDependencies += "com.typesafe.akka" %% "akka-stream-contrib" % "0.9"
Run Code Online (Sandbox Code Playgroud)

和:

import akka.stream.contrib.AccumulateWhileUnchanged
source.via(new AccumulateWhileUnchanged(_._1))
Run Code Online (Sandbox Code Playgroud)