如何将Akka Streams Merge的输出传输到另一个Flow?

Ric*_*nry 4 scala stream akka

我正在玩Akka Streams,并且已经找到了大部分的基础知识,但我不清楚如何获取a的结果Merge并在其上进行进一步的操作(map,filter,fold等).

我想修改下面的代码,以便我可以进一步操作数据,而不是将合并传递给接收器.

implicit val materializer = FlowMaterializer()

val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))
val sink = ForeachSink(println)

val materialized = FlowGraph { implicit builder =>
  import FlowGraphImplicits._
  val merge = Merge[Int]("m1")
  items_a ~> merge
  items_b ~> merge ~> sink
}.run()
Run Code Online (Sandbox Code Playgroud)

我想我的主要问题是我无法弄清楚如何制作一个没有源代码的流组件,而且我无法弄清楚如何在不使用特殊的Merge对象和~>语法的情况下进行合并.

编辑:这个问题和答案是和Akka Streams 0.11一起使用的

cmb*_*ter 6

如果你不关心的语义Merge哪里去要素下游随机,那么你可以只是尝试concatSource,而不是像这样:

items_a.concat(items_b).map(_ * 2).map(_.toString).foreach(println)
Run Code Online (Sandbox Code Playgroud)

这里的不同之处在于,所有项目a都将在任何元素之前首先向下游流动b.如果你真的需要这样的行为Merge,那么你可以考虑以下内容(请记住,你最终需要一个接收器,但你可以在合并后进行额外的转换):

val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))

val sink = ForeachSink[Double](println)
val transform = Flow[Int].map(_ * 2).map(_.toDouble).to(sink)


val materialized = FlowGraph { implicit builder =>
  import FlowGraphImplicits._
  val merge = Merge[Int]("m1")
  items_a ~> merge
  items_b ~> merge ~> transform
}.run
Run Code Online (Sandbox Code Playgroud)

在此示例中,您可以看到我使用Flow协同服务器中的帮助程序来创建Flow没有特定输入的帮助程序Source.然后我可以将它附加到合并点以获得我的额外处理.