map和mapAsync之间的区别

Ana*_*and 19 scala akka-stream

任何人都可以解释我地图和mapAsync与AKKA流之间的区别吗?在文档中,据说

可以使用mapAsync或mapAsyncUnordered执行涉及外部非基于流的服务的流转换和副作用

为什么我们不能简单地在这里映射?我假设Flow,Source,Sink都是Monadic,因此map应该在这些性质的延迟中正常工作?

Ram*_*gil 44

签名

差异最好在签名中突出显示: Flow.map接受一个函数,该函数返回一个类型,T同时Flow.mapAsync接收一个返回一个类型的函数Future[T].

实际例子

例如,假设我们有一个函数,它根据用户ID在数据库中查询用户的全名:

type UserID   = String
type FullName = String

val databaseLookup : UserID => FullName = ???  //implementation unimportant
Run Code Online (Sandbox Code Playgroud)

给定一个akka 值流Source,UserID我们可以Flow.map在Stream中使用它来查询数据库并将全名打印到控制台:

val userIDSource : Source[UserID, _] = ???

val stream = 
  userIDSource.via(Flow[UserID].map(databaseLookup))
              .to(Sink.foreach[FullName](println))
              .run()
Run Code Online (Sandbox Code Playgroud)

此方法的一个限制是此流将一次只进行1 db查询.串行查询将是一个"瓶颈",可能会阻止我们的流中的最大吞吐量.

我们可以尝试使用以下方法通过并发查询来提高性能Future:

def concurrentDBLookup(userID : UserID) : Future[FullName] = 
  Future { databaseLookup(userID) }

val concurrentStream = 
  userIDSource.via(Flow[UserID].map(concurrentDBLookup))
              .to(Sink.foreach[Future[FullName]](_ foreach println))
              .run()
Run Code Online (Sandbox Code Playgroud)

这个简单的附录的问题是我们已经有效地消除了背压.

Sink只是拉动了Future并添加了一个foreach println,与数据库查询相比,它相对较快.该流将不断地将需求传播到源并在其中产生更多的期货Flow.map.因此,databaseLookup并发运行的数量没有限制.不受干扰的并行查询最终可能使数据库过载.

Flow.mapAsync救援; 我们可以同时进行数据库访问,同时限制同时查找的数量:

val maxLookupCount = 10

val maxLookupConcurrentStream = 
  userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
              .to(Sink.foreach[FullName](println))
              .run()
Run Code Online (Sandbox Code Playgroud)

还要注意,它Sink.foreach变得越来越简单,它不再需要一个Future[FullName]而是一个FullName而不是一个.

无序异步映射

如果不需要维护UserID到FullNames的顺序,那么您可以使用Flow.mapAsyncUnordered.例如:您只需要将所有名称打印到控制台,但不关心它们的打印顺序.

  • `mapAsync`是否类似于将异步边界应用于该特定阶段?根据文档,标记异步边界将运行actor中的每个阶段,只是想知道它是否相同. (3认同)