akka stream asyncBoundary vs mapAsync

Xia*_*ong 10 scala stream-processing akka akka-stream

我想明白之间的差别asyncBoundarymapAsync.从一目了然,我猜他们应该是一样的.但是,当我运行代码时,它看起来asyncBoundary比性能更快mapAsync

这是代码

implicit val system = ActorSystem("sourceDemo")
implicit val materializer = ActorMaterializer()


Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()
Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()
Run Code Online (Sandbox Code Playgroud)

输出:异步边界总是比mayAsync完成更快.

从描述asyncBoundary(https://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-flows-and-basics.html)的文档中,我看到它正在运行在不同的CPU上,但mapAsync是使用Future的多线程.未来也是异步的.

请问有关这两个API的更多说明吗?

Ste*_*tti 10

异步

正确地指出这会强制在两个阶段之间插入异步边界.在你的例子中

Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()
Run Code Online (Sandbox Code Playgroud)

这实际上意味着+ 1操作和* 2操作将由分离的参与者进行.这使得流水线操作成为可能,同时元素移动到* 2舞台上,同时可以为+ 1舞台引入另一个元素.如果您强制执行异步边界,则在从上游请求新的操作之前,同一个actor将对操作进行顺序操作并对一个元素执行操作.

顺便说一句,您的示例可以使用async组合器以较短的格式重写:

Source(1 to 100).map(_ + 1).async.map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()
Run Code Online (Sandbox Code Playgroud)

mapAsync

这是并行执行异步操作的阶段.并行因子允许您指定要旋转以提供传入元素的最大并行actor数.并行计算的结果由阶段按顺序跟踪和发出mapAsync.

在你的例子中

Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()
Run Code Online (Sandbox Code Playgroud)

可能最多100个+ 1操作(即所有操作)可以并行运行,并按顺序收集结果.随后,* 2可以并行运行多达100个操作,并且结果再次按顺序收集并向下游发射.

在您的示例中,您正在运行CPU限制的快速操作,这些操作无法正常使用mapAsync,因为此阶段所需的基础架构很可能比并行运行其中100个操作的优势要昂贵得多.mapAsync在处理IO绑定的慢操作时特别有用,其中并行化非常方便.

另请注意,mapAsync根据定义,还会引入异步边界,因此您可能会将其视为"扩展" async,您可以在其中指定大于1的并行度.

  • 我还要指出,当我们不关心订单时使用`.mapAsyncUnordered`可以提高性能. (2认同)