重用akka-stream流的优雅方式

Tom*_*zak 8 scala akka akka-stream

我正在寻找一种轻松重用akka-stream流的方法.

我把Flow打算重新用作函数,所以我想保留它的签名:

Flow[Input, Output, NotUsed]

现在,当我使用此流程时,我希望能够"调用"此流程并将结果保留在一边以便进一步处理.

所以我想从Flow发射开始[Input],应用我的流程,然后继续Flow发射[(Input, Output)].

例:

val s: Source[Int, NotUsed] = Source(1 to 10)

val stringIfEven = Flow[Int].filter(_ % 2 == 0).map(_.toString)

val via: Source[(Int, String), NotUsed] = ???
Run Code Online (Sandbox Code Playgroud)

现在这不可能以一种简单的方式进行,因为将流量与.via()流量相结合会使流量发光[Output]

val via: Source[String, NotUsed] = s.via(stringIfEven)
Run Code Online (Sandbox Code Playgroud)

另一种方法是使我的可重用流发出[(Input, Output)]但这需要每个流将其输入推送到所有阶段并使我的代码看起来很糟糕.

所以我想出了一个像这样的组合器:

def tupledFlow[In,Out](flow: Flow[In, Out, _]):Flow[In, (In,Out), NotUsed] = {
  Flow.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val broadcast = b.add(Broadcast[In](2))
  val zip = b.add(Zip[In, Out])

  broadcast.out(0) ~> zip.in0
  broadcast.out(1) ~> flow ~> zip.in1

  FlowShape(broadcast.in, zip.out)
})
Run Code Online (Sandbox Code Playgroud)

}

这是将输入广播到流中,也是直接在平行线上广播 - >都在'Zip'阶段,我将值连接到一个元组.然后它可以优雅地应用:

val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEven))
Run Code Online (Sandbox Code Playgroud)

一切都很棒但是当给定流量时正在进行"过滤"操作 - 这个组合器被卡住并停止处理更多事件.

我想这是由于'Zip'行为需要所有子流做同样的事情 - 在我的情况下,一个分支直接传递给定对象,所以另一个子流不能忽略这个元素.filter(),因为它确实 - 流停止,因为Zip正在等待推送.

有没有更好的方法来实现流量组成?当'flow'忽略带'filter'的元素时,我可以在tupledFlow中做些什么来获得所需的行为吗?

Ste*_*tti 3

两种可能的方法(虽然优雅但有争议)是:

1) 避免使用过滤阶段,将过滤器转变为Flow[Int, Option[Int], NotUsed]. 这样您就可以将拉链包装纸应用到整个图表上,就像您最初的计划一样。然而,代码看起来更受污染,并且传递Nones 会增加开销。

val stringIfEvenOrNone = Flow[Int].map{
  case x if x % 2 == 0 => Some(x.toString)
  case _ => None
}

val tupled: Source[(Int, String), NotUsed] = s.via(tupledFlow(stringIfEvenOrNone)).collect{
  case (num, Some(str)) => (num,str)
}
Run Code Online (Sandbox Code Playgroud)

2)将过滤和转换阶段分开,并在压缩包装之前应用过滤阶段。可能是更轻量级和更好的折衷方案。

val filterEven = Flow[Int].filter(_ % 2 == 0)

val toString = Flow[Int].map(_.toString)

val tupled: Source[(Int, String), NotUsed] = s.via(filterEven).via(tupledFlow(toString))
Run Code Online (Sandbox Code Playgroud)

编辑

3)为了清楚起见,根据评论中的讨论,在这里发布另一个解决方案。

该流包装器允许发出给定流中的每个元素,并与生成它的原始输入元素配对。它适用于任何类型的内部流(为每个输入发出 0、1 或更多元素)。

  def tupledFlow[In,Out](flow: Flow[In, Out, _]): Flow[In, (In,Out), NotUsed] =
    Flow[In].flatMapConcat(in => Source.single(in).via(flow).map( out => in -> out))
Run Code Online (Sandbox Code Playgroud)