Xia*_*ong 10 scala stream-processing akka akka-stream
我想明白之间的差别asyncBoundary和mapAsync.从一目了然,我猜他们应该是一样的.但是,当我运行代码时,它看起来asyncBoundary比性能更快mapAsync
这是代码
implicit val system = ActorSystem("sourceDemo")
implicit val materializer = ActorMaterializer()
Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()
Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()
该输出:异步边界总是比mayAsync完成更快.
从描述asyncBoundary(https://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-flows-and-basics.html)的文档中,我看到它正在运行在不同的CPU上,但mapAsync是使用Future的多线程.未来也是异步的.
请问有关这两个API的更多说明吗?
Ste*_*tti 10
异步
正确地指出这会强制在两个阶段之间插入异步边界.在你的例子中
Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()
这实际上意味着+ 1操作和* 2操作将由分离的参与者进行.这使得流水线操作成为可能,同时元素移动到* 2舞台上,同时可以为+ 1舞台引入另一个元素.如果您不强制执行异步边界,则在从上游请求新的操作之前,同一个actor将对操作进行顺序操作并对一个元素执行操作.
顺便说一句,您的示例可以使用async组合器以较短的格式重写:
Source(1 to 100).map(_ + 1).async.map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()
mapAsync
这是并行执行异步操作的阶段.并行因子允许您指定要旋转以提供传入元素的最大并行actor数.并行计算的结果由阶段按顺序跟踪和发出mapAsync.
在你的例子中
Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()
可能最多100个+ 1操作(即所有操作)可以并行运行,并按顺序收集结果.随后,* 2可以并行运行多达100个操作,并且结果再次按顺序收集并向下游发射.
在您的示例中,您正在运行CPU限制的快速操作,这些操作无法正常使用mapAsync,因为此阶段所需的基础架构很可能比并行运行其中100个操作的优势要昂贵得多.mapAsync在处理IO绑定的慢操作时特别有用,其中并行化非常方便.
另请注意,mapAsync根据定义,还会引入异步边界,因此您可能会将其视为"扩展" async,您可以在其中指定大于1的并行度.
| 归档时间: | 
 | 
| 查看次数: | 1120 次 | 
| 最近记录: |