如何在Scala中实现Future作为应用?

Mic*_*ael 6 concurrency functional-programming scala future applicative

假设我需要运行两个并发计算,等待它们,然后组合它们的结果.更具体地说,我需要运行f1: X1 => Y1f2: X2 => Y2同时然后调用f: (Y1, Y2) => Y最终得到一个值Y.

我可以创建未来的计算 fut1: X1 => Future[Y1],fut2: X2 => Future[Y2]然后组合它们以fut: (X1, X2) => Future[Y]使用monadic组合.

问题是monadic组合意味着顺序等待.在我们的例子中,它意味着我们先等待一个未来,然后我们将等待另一个未来.例如.如果它需要2秒.到第一个未来完成,只需1秒.到第二个未来失败,我们浪费1秒.

因此,它看起来像我们所需要的应用性期货的组成等到要么两者完全或至少一个未来的失败.是否有意义 ?你会如何实施<*>期货?

Kol*_*mar 5

其他答案中的所有方法都没有在未来快速失败的情况下做正确的事情,以及在很长一段时间后成功的未来.

但是这种方法可以手动实现:

def smartSequence[A](futures: Seq[Future[A]]): Future[Seq[A]] = {
  val counter = new AtomicInteger(futures.size)
  val result = Promise[Seq[A]]()

  def attemptComplete(t: Try[A]): Unit = {
    val remaining = counter.decrementAndGet
    t match {
      // If one future fails, fail the result immediately
      case Failure(cause) => result tryFailure cause
      // If all futures have succeeded, complete successful result
      case Success(_) if remaining == 0 => 
        result tryCompleteWith Future.sequence(futures)
      case _ =>
    }
  }

  futures.foreach(_ onComplete attemptComplete)
  result.future
}
Run Code Online (Sandbox Code Playgroud)

ScalaZ做了类似的事情在内部,所以无论f1 |@| f2List(f1, f2).sequence任何期货失败后立即失效.

以下是对这些方法失败时间的快速测试:

import java.util.Date
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scalaz._, Scalaz._

object ReflectionTest extends App {
  def f1: Future[Unit] = Future {
    Thread.sleep(2000)
  }

  def f2: Future[Unit] = Future {
    Thread.sleep(1000)
    throw new RuntimeException("Failure")
  }

  def test(name: String)(
    f: (Future[Unit], Future[Unit]) => Future[Unit]
  ): Unit = {
    val start = new Date().getTime
    f(f1, f2).andThen {
      case _ => 
        println(s"Test $name completed in ${new Date().getTime - start}")
    }
    Thread.sleep(2200)
  }

  test("monadic") { (f1, f2) => for (v1 <- f1; v2 <- f2) yield () }

  test("zip") { (f1, f2) => (f1 zip f2).map(_ => ()) }

  test("Future.sequence") { 
    (f1, f2) => Future.sequence(Seq(f1, f2)).map(_ => ()) 
  }

  test("smartSequence") { (f1, f2) => smartSequence(Seq(f1, f2)).map(_ => ())}

  test("scalaz |@|") { (f1, f2) => (f1 |@| f2) { case _ => ()}}

  test("scalaz sequence") { (f1, f2) => List(f1, f2).sequence.map(_ => ())}

  Thread.sleep(30000)
}
Run Code Online (Sandbox Code Playgroud)

我的机器上的结果是:

Test monadic completed in 2281
Test zip completed in 2008
Test Future.sequence completed in 2007
Test smartSequence completed in 1005
Test scalaz |@| completed in 1003
Test scalaz sequence completed in 1005
Run Code Online (Sandbox Code Playgroud)