如何并排组合两个流程?

mis*_*tor 6 functional-programming scala akka akka-stream

是否有一个Akka溪流组合器用于执行以下操作(或具有相应效果的东西)?(我们and现在就叫它.)

(flow1: Flow[I, O, Mat]).and[O2](flow2: Flow[I, O2, Mat]): Flow[I, (O, O2), Mat]
Run Code Online (Sandbox Code Playgroud)

语义是无论什么来源,它的元素都将被传递给两个Flows,它们的输出将被组合成一个新Flow的元组.(对于熟悉类别理论风格函数式编程的箭头的人,我正在寻找类似的东西&&&.)

库中有两个看起来相关的组合器,即zipalsoTo.但前者接受a SourceShape,后者接受a SinkShape.两者都不会承认GraphShape.为什么会这样?

我的用例如下:

someSource
  .via(someFlowThatReturnsUnit.and(Flow.apply))
  .runWith(someSink)
Run Code Online (Sandbox Code Playgroud)

没找到类似的东西.and,我修改了我原来的Flow样子:

someSource
  .via(someFlowThatDoesWhateverItWasDoingEarlierButNowAlsoEmitsInputsAsIs)
  .runWith(someSink)
Run Code Online (Sandbox Code Playgroud)

这有效,但我正在寻找更清洁,更具成分性的解决方案.

Ram*_*gil 6

注意

正如Viktor Klang在评论中所指出的那样:Tuple2[O,O2]只有当知道两个流,flow1&flow2,相对于输入元素数和输出元素数是1:1 时,压缩到a 才可行.

图形解决方案

可以在Graph内部创建元组构造.事实上,您的问题几乎完全符合介绍性示例:

在此输入图像描述

扩展链接中的示例代码,您可以使用BroadcastZip

val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
  import GraphDSL.Implicits._
  val in = Source(1 to 10)
  val out = Sink.ignore

  val bcast = builder.add(Broadcast[Int](2))

  val merge = builder.add(Zip[Int, Int]()) //different than link

  val f1, f2, f4 = Flow[Int].map(_ + 10)

  val f3 = Flow[(Int, Int)].map(t => t._1 + t._2) //different than link

  in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
              bcast ~> f4 ~> merge
  ClosedShape
})//end RunnableGraph.fromGraph
Run Code Online (Sandbox Code Playgroud)

有点Hacky流解决方案

如果您正在寻找纯流解决方案,则可以使用中间流,但Mat不会维护,并且它涉及为每个输入元素实现2个流的实现:

def andFlows[I, O, O2] (maxConcurrentSreams : Int)
                       (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
                       (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (O, O2), _] = 
  Flow[I].mapAsync(maxConcurrentStreams){ i =>

    val o  : Future[O]  = Source
                           .single(i)
                           .via(flow1)
                           .to(Sink.head[O])
                           .run()

    val o2 : Future[O2] = Source
                           .single(i)
                           .via(flow2)
                           .to(Sink.head[O2])
                           .run()

    o zip o2
  }//end Flow[I].mapAsync
Run Code Online (Sandbox Code Playgroud)

通用拉链

如果你想使这个压缩通用,对于大多数流,那么输出类型必须是(Seq[O], Seq[O2]).可以通过使用Sink.seq而不是Sink.head在上面的andFlows函数中生成此类型:

def genericAndFlows[I, O, O2] (maxConcurrentSreams : Int)
                              (flow1: Flow[I, O, NotUsed], flow2: Flow[I, O2, NotUsed])
                              (implicit mat : Materializer, ec : ExecutionContext) : Flow[I, (Seq[O], Seq[O2]), _] = 
  Flow[I].mapAsync(maxConcurrentStreams){ i =>

    val o  : Future[Seq[O]]  = Source
                                .single(i)
                                .via(flow1)
                                .to(Sink.seq[O])
                                .run()

    val o2 : Future[Seq[O2]] = Source
                                .single(i)
                                .via(flow2)
                                .to(Sink.seq[O2])
                                .run()

    o zip o2
  }//end Flow[I].mapAsync
Run Code Online (Sandbox Code Playgroud)