Akka Stream Option输出

Rem*_*coW 7 scala akka akka-stream

我已经创建了一个阿卡流具有简单Source,FlowSink.有了这个,我可以轻松地通过它发送元素.现在我想改变这个流,以便Flow返回一个Option.取决于Option我想要改变输出的结果Flow.

在此输入图像描述

是否有可能创建这样的结构?

Ste*_*tti 8

这两个答案都涉及到Broadcast.请注意,它可能适用于此特定示例,但在更复杂的图形中Broadcast可能不是一个明智的选择.原因是Broadcast如果至少有一个下游背压,则始终是背压.最好的背压感知解决方案是Partition,能够从分区器功能选择的分支中选择性地传播背压.

以下示例(详细说明T-Fowl的答案之一)

  def splittingSink[T, M1, M2, Mat](f: T ? Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ? Mat): Sink[T, Mat] = {
    val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ?
      (sink1, sink2) ? {
        import GraphDSL.Implicits._

        def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1)
        val partition = builder.add(Partition[Option[T]](2, partitioner))
        partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ? t } ~> sink1.in
        partition.out(1) ~> Flow[Option[T]].collect { case None ? None } ~> sink2.in

        val mapper = builder.add(Flow.fromFunction(f))
        mapper.out ~> partition.in

        SinkShape(mapper.in)
      }
    }
    Sink.fromGraph(graph)
  }
Run Code Online (Sandbox Code Playgroud)