是否有"慢"的Future.traverse版本?

Ale*_*eev 11 scala future

我发现为一个用户请求构建大量期货通常是一种不好的做法.这些期货可以填补执行上下文,这将影响其他请求.这不太可能是你真正想要的.保持期货数量很小很简单 - 仅在for-comprehensions中创建新期货,使用flatMap等.但有时可能需要为每个Seq项目创建Future.使用Future.sequence或Future.traverse导致上述问题.所以我最终得到了这个解决方案,它不会同时为每个收集项创建Futures:

  def ftraverse[A, B](xs: Seq[A])(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = {
    if(xs.isEmpty) Future successful Seq.empty[B]
    else f(xs.head) flatMap { fh => ftraverse(xs.tail)(f) map (r => fh +: r) }
  }
Run Code Online (Sandbox Code Playgroud)

我想知道,也许我正在发明一个轮子,实际上这样的功能已经存在于Scala的标准库中?另外我想知道,你遇到过描述的问题,你是如何解决的?也许,如果这是Futures的一个众所周知的问题,我应该在Future.scala中创建一个pull请求,这样这个函数(或者它的更通用版本)会包含在标准库中吗?

UPD:更一般的版本,有限的并行性:

  def ftraverse[A, B](xs: Seq[A], chunkSize: Int, maxChunks: Int)(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = {
    val xss = xs.grouped(chunkSize).toList
    val chunks = xss.take(maxChunks-1) :+ xss.drop(maxChunks-1).flatten
    Future.sequence{ chunks.map(chunk => ftraverse(chunk)(f) ) } map { _.flatten }
  } 
Run Code Online (Sandbox Code Playgroud)

Mic*_*jac 12

不,标准库中没有这样的东西.是否应该,我不能说.我认为想要以Future严格的顺序执行s 并不常见.但是当你想要的时候,你可以很容易地实现自己的方法.我个人只是为了这个目的在我自己的库中保留一个方法.但是,使用标准库有一种方法可以很方便.如果,它应该是更通用.

它实际上是非常简单的修改当前traverse工艺Future小号顺序,而不是平行.这是当前版本,它使用foldLeft而不是递归:

def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
    in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
      val fb = fn(a)
      for (r <- fr; b <- fb) yield (r += b)
    }.map(_.result())
Run Code Online (Sandbox Code Playgroud)

Futures的前创建的flatMap通过分配val fb = fn(a)(和前因而执行).所有人需要做的就是fn(a)在内部移动flatMap以延迟Future在集合中创建后续的s.

def traverseSeq[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
    in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
      for (r <- fr; b <- fn(a)) yield (r += b)
    }.map(_.result())
Run Code Online (Sandbox Code Playgroud)

另一种限制执行大量Futures 的影响的方法是使用不同ExecutionContext的方法.例如,在Web应用程序中,我可能会保留一个ExecutionContext用于数据库调用,一个用于调用Amazon S3,另一个用于缓慢的数据库调用.

一个非常简单的实现可以使用固定的线程池:

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
val executionContext = ExecutionContext.fromExecutorService(executorService)
Run Code Online (Sandbox Code Playgroud)

Future这里执行的大量s会填充ExecutionContext,但这会阻止它们填充其他上下文.

如果您正在使用Akka,则可以ExecutionContext使用Dispatchers在配置中轻松创建s ActorSystem:

my-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 2
    parallelism-factor = 2.0
    parallelism-max = 10
  }
  throughput = 100
}
Run Code Online (Sandbox Code Playgroud)

如果你有一个ActorSystem被叫,system你可以通过以下方式访问它:

implicit val executionContext = system.dispatchers.lookup("my-dispatcher")
Run Code Online (Sandbox Code Playgroud)

所有这些都取决于您的使用案例.虽然我将异步计算分离到不同的上下文中,但有时我仍然希望traverse按顺序平滑这些上下文的使用.