我正在寻找一种轻松重用akka-stream流的方法.
我把Flow打算重新用作函数,所以我想保留它的签名:
Flow[Input, Output, NotUsed]
现在,当我使用此流程时,我希望能够"调用"此流程并将结果保留在一边以便进一步处理.
所以我想从Flow发射开始[Input],应用我的流程,然后继续Flow发射[(Input, Output)].
例:
val s: Source[Int, NotUsed] = Source(1 to 10)
val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString)
val via: Source[(Int, String), NotUsed] = ???
Run Code Online (Sandbox Code Playgroud)
现在这不可能以一种简单的方式进行,因为将流量与.via()流量相结合会使流量发光[Output]
val via: Source[String, NotUsed] = s.via(stringIfEven)
Run Code Online (Sandbox Code Playgroud)
另一种方法是使我的可重用流发出[(Input, Output)]但这需要每个流将其输入推送到所有阶段并使我的代码看起来很糟糕.
所以我想出了一个像这样的组合器:
def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[In](2))
val zip = b.add(Zip[In, Out])
broadcast.out(0) ~> zip.in0
broadcast.out(1) …Run Code Online (Sandbox Code Playgroud)