使用返回未来的函数遍历列表和流

Tra*_*own 37 concurrency scala future scalaz applicative

介绍

Scala Future(在2.10现在的2.9.3中新的)是一个应用程序的函子,这意味着如果我们有一个可遍历的类型 F,我们可以使用F[A]一个函数A => Future[B]并将它们变成一个函数Future[F[B]].

此操作在标准库中可用Future.traverse.如果我们从库中导入applicative functor实例,Scalaz 7还提供了更通用的功能.traverseFuturescalaz-contrib

这两种traverse方法在流的情况下表现不同.标准库遍历在返回之前使用流,而Scalaz会立即返回未来:

import scala.concurrent._
import ExecutionContext.Implicits.global

// Hangs.
val standardRes = Future.traverse(Stream.from(1))(future(_))

// Returns immediately.
val scalazRes = Stream.from(1).traverse(future(_))
Run Code Online (Sandbox Code Playgroud)

还有另一个不同之处,正如Leif Warner 在此观察到的那样.标准库traverse立即启动所有异步操作,而Scalaz启动第一个,等待它完成,启动第二个,等待它,依此类推.

流的不同行为

通过编写一个将为流中的第一个值休眠几秒钟的函数来显示第二个差异非常容易:

def howLong(i: Int) = if (i == 1) 10000 else 0

import scalaz._, Scalaz._
import scalaz.contrib.std._

def toFuture(i: Int)(implicit ec: ExecutionContext) = future {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}
Run Code Online (Sandbox Code Playgroud)

现在Future.traverse(Stream(1, 2))(toFuture)将打印以下内容:

Starting 1!
Starting 2!
Done 2!
Done 1!
Run Code Online (Sandbox Code Playgroud)

而Scalaz版本(Stream(1, 2).traverse(toFuture)):

Starting 1!
Done 1!
Starting 2!
Done 2!
Run Code Online (Sandbox Code Playgroud)

这可能不是我们想要的.

对于名单?

奇怪的是,这两个遍历在这方面的表现相同 - Scalaz不会等待一个未来在开始下一个之前完成.

另一个未来

Scalaz还包括自己的concurrent包,其中包含自己的期货实现.我们可以使用与上面相同的设置:

import scalaz.concurrent.{ Future => FutureZ, _ }

def toFutureZ(i: Int) = FutureZ {
  printf("Starting %d!\n", i)
  Thread.sleep(howLong(i))
  printf("Done %d!\n", i)
  i
}
Run Code Online (Sandbox Code Playgroud)

然后我们在列表和流的流上获得Scalaz的行为:

Starting 1!
Done 1!
Starting 2!
Done 2!
Run Code Online (Sandbox Code Playgroud)

也许不那么令人惊讶的是,遍历无限流仍然会立即返回.

此时我们确实需要一个表来总结,但列表必须要做:

  • 具有标准库遍历的流:在返回之前消耗; 不要等待每一个未来.
  • 使用Scalaz遍历的流:立即返回; 等待每个未来完成.
  • Scalaz期货与流:立即返回; 等待每个未来完成.

和:

  • 标准库遍历列表:不要等待.
  • 使用Scalaz遍历列表:不要等待.
  • 带有列表的Scalaz期货:等待每个未来完成.

这有意义吗?列表和流上的此操作是否存在"正确"行为?是否存在某种原因导致"最异步"行为 - 即,在返回之前不消耗集合,并且不等待每个未来在继续下一个之前完成 - 这里没有表示?

ste*_*hke 1

我无法全部回答,但我尝试了一些部分:

\n\n
\n

是否有某种原因导致“最异步”行为\xe2\x80\x94,即,在返回之前不\n消耗集合,并且在继续之前不等待每个\n未来完成下一个\xe2\x80\x94 不在这里表示\n 吗?

\n
\n\n

如果您有相关计算且线程数量有限,则可能会遇到死锁。例如,您有两个 future 依赖于第三个 future(future 列表中的所有三个),并且只有两个线程,您可能会遇到前两个 future 阻塞所有两个线程而第三个线程永远不会执行的情况。(当然,如果你的池大小是1,即执行一个计算后另一个计算,你会得到类似的情况)

\n\n

为了解决这个问题,每个 future 都需要一个线程,没有任何限制。这适用于小型期货清单,但不适用于大型期货清单。因此,如果您并行运行所有程序,您将遇到这样一种情况:小示例在所有情况下都会运行,而较大的示例将陷入僵局。(示例:开发人员测试运行良好,生产死锁)。

\n\n
\n

在列表和流上的此操作是否有“正确”的行为?

\n
\n\n

我认为对于期货来说这是不可能的。如果您了解更多的依赖关系,或者当您确定计算不会阻塞时,可能会出现更并发的解决方案。但执行期货清单对我来说是“被设计破坏的”。最好的解决方案似乎是一个,对于死锁的小例子来说,它已经失败了(即一个接一个地执行 Future)。

\n\n
\n

带有列表的 Scalaz future:请等待每个 future 完成。

\n
\n\n

我认为 scalaz 在内部使用理解来进行遍历。对于推导式,不能保证计算是独立的。所以我猜想 Scalaz 在理解方面做了正确的事情:一个接一个地进行计算。对于 future 来说,只要操作系统中有无限的线程,这将始终有效。

\n\n

换句话说:您看到的只是理解(必须)如何工作的产物。

\n\n

我希望这有一定道理。

\n