Akka stream - 列出了对各个元素的异步

ani*_*u99 3 scala akka akka-stream

我的流有一个Flow,其输出是List [Any]对象.我希望有一个mapAsync,后跟一些其他阶段,每个阶段处理一个单独的元素而不是列表.我怎样才能做到这一点?

实际上我想连接输出

Flow[Any].map { msg =>
  someListDerivedFrom(msg)
}
Run Code Online (Sandbox Code Playgroud)

被消费 -

Flow[Any].mapAsyncUnordered(4) { listElement =>
  actorRef ? listElement
}.someOtherStuff
Run Code Online (Sandbox Code Playgroud)

我该怎么做呢?

cmb*_*ter 9

我想你正在寻找的组合器是mapConcat.这个组合器将接受一个输入参数并返回一个Iterable.一个简单的例子如下:

implicit val system = ActorSystem()
implicit val mater = ActorMaterializer()

val source = Source(List(List(1,2,3), List(4,5,6)))
val sink = Sink.foreach[Int](println)

val graph =
  source.
    mapConcat(identity).
    to(sink)
graph.run
Run Code Online (Sandbox Code Playgroud)

在这里,我Source正在吐出List元素,而我Sink接受了那些内容的基本类型List.由于类型不同,我无法将它们直接连接在一起.但是如果我mapConcat在它们之间应用,它们可以连接,因为组合器会将这些List元素展平,将它们各自的元素(Int)发送到下游.因为输入元素mapConcat已经是一个Iterable,所以你只需要使用identify体内的函数mapConcat来使事情有效.