ani*_*u99 3 scala akka akka-stream
我的流有一个Flow,其输出是List [Any]对象.我希望有一个mapAsync,后跟一些其他阶段,每个阶段处理一个单独的元素而不是列表.我怎样才能做到这一点?
实际上我想连接输出
Flow[Any].map { msg =>
someListDerivedFrom(msg)
}
Run Code Online (Sandbox Code Playgroud)
被消费 -
Flow[Any].mapAsyncUnordered(4) { listElement =>
actorRef ? listElement
}.someOtherStuff
Run Code Online (Sandbox Code Playgroud)
我该怎么做呢?
我想你正在寻找的组合器是mapConcat.这个组合器将接受一个输入参数并返回一个Iterable.一个简单的例子如下:
implicit val system = ActorSystem()
implicit val mater = ActorMaterializer()
val source = Source(List(List(1,2,3), List(4,5,6)))
val sink = Sink.foreach[Int](println)
val graph =
source.
mapConcat(identity).
to(sink)
graph.run
Run Code Online (Sandbox Code Playgroud)
在这里,我Source正在吐出List元素,而我Sink接受了那些内容的基本类型List.由于类型不同,我无法将它们直接连接在一起.但是如果我mapConcat在它们之间应用,它们可以连接,因为组合器会将这些List元素展平,将它们各自的元素(Int)发送到下游.因为输入元素mapConcat已经是一个Iterable,所以你只需要使用identify体内的函数mapConcat来使事情有效.
| 归档时间: |
|
| 查看次数: |
3315 次 |
| 最近记录: |