Rem*_*coW 7 scala akka akka-stream
我已经创建了一个阿卡流具有简单Source,Flow和Sink.有了这个,我可以轻松地通过它发送元素.现在我想改变这个流,以便Flow返回一个Option.取决于Option我想要改变输出的结果Flow.
是否有可能创建这样的结构?
这两个答案都涉及到Broadcast.请注意,它可能适用于此特定示例,但在更复杂的图形中Broadcast可能不是一个明智的选择.原因是Broadcast如果至少有一个下游背压,则始终是背压.最好的背压感知解决方案是Partition,能够从分区器功能选择的分支中选择性地传播背压.
以下示例(详细说明T-Fowl的答案之一)
def splittingSink[T, M1, M2, Mat](f: T ? Option[T], someSink: Sink[T, M1], noneSink: Sink[None.type, M2], combineMat: (M1, M2) ? Mat): Sink[T, Mat] = {
val graph = GraphDSL.create(someSink, noneSink)(combineMat) { implicit builder ?
(sink1, sink2) ? {
import GraphDSL.Implicits._
def partitioner(o: Option[T]) = o.map(_ => 0).getOrElse(1)
val partition = builder.add(Partition[Option[T]](2, partitioner))
partition.out(0) ~> Flow[Option[T]].collect { case Some(t) ? t } ~> sink1.in
partition.out(1) ~> Flow[Option[T]].collect { case None ? None } ~> sink2.in
val mapper = builder.add(Flow.fromFunction(f))
mapper.out ~> partition.in
SinkShape(mapper.in)
}
}
Sink.fromGraph(graph)
}
Run Code Online (Sandbox Code Playgroud)