Ale*_*rev 6 scala akka reactive-streams akka-stream
我有以下简单的案例类层次结构:
sealed trait Message
case class Foo(bar: Int) extends Message
case class Baz(qux: String) extends Message
Run Code Online (Sandbox Code Playgroud)
我有一个Flow[Message, Message, NotUsed](来自基于Websocket的协议,已经有编解码器).
我想将它解复用Flow[Message]为Foo和Baz类型的单独流程,因为它们由完全不同的路径处理.
这样做最简单的方法是什么?应该是显而易见的,但我遗漏了一些东西......
小智 6
一种方法是使用create a RunnableGraph,其中包含每种消息类型的Flow.
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
val in = Source(...) // Some message source
val out = Sink.ignore
val foo = builder.add(Flow[Message].map (x => x match { case f@Foo(_) => f }))
val baz = builder.add(Flow[Message].map (x => x match { case b@Baz(_) => b }))
val partition = builder.add(Partition[Message](2, {
case Foo(_) => 0
case Baz(_) => 1
}))
partition ~> foo ~> // other Flow[Foo] here ~> out
partition ~> baz ~> // other Flow[Baz] here ~> out
ClosedShape
}
g.run()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1287 次 |
| 最近记录: |