如何根据另一个流控制Akka Stream的流量

ton*_*ian 6 akka-stream

说我有两个来源:

val ticks = Source(1 to 10)
val values = Source[Int](Seq(3,4,4,7,8,8,8,8,9).to[collection.immutable.Iterable])
Run Code Online (Sandbox Code Playgroud)

我想Graph[...]在Akka Stream中创建一个处理步骤,该步骤基于ticks它在值流中尽可能多地消耗的流的当前值.因此,例如,当值匹配时,我想返回第二个源中匹配的所有元素,否则保持滴答声,从而产生如下输出:

(1, None)
(2, None)
(3, Some(Seq(3)))
(4, Some(Seq(4, 4)))
(5, None)
(6, None)
(7, Some(Seq(7)))
(8, Some(Seq(8,8,8,8)))
(9, Some(Seq(9)))
(10, None)
Run Code Online (Sandbox Code Playgroud)

你会如何实现这种行为?

GJZ*_*GJZ 1

我建议您查看有关此主题的 Akka Stream 文档:http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-graphs.html

根据该网站,您可以像这样实现 GraphStage:

final class AccumulateWhileUnchanged[E] extends GraphStage[FlowShape[E, immutable.Seq[E]]] {

val in = Inlet[E]("AccumulateWhileUnchanged.in")
val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out")

override def shape = FlowShape(in, out)
}
Run Code Online (Sandbox Code Playgroud)

还有一篇关于此主题的博客文章:http://blog.kunicki.org/blog/2016/07/20/implementing-a-custom-akka-streams-graph-stage/

希望这可以帮助 :)