等到所有Future.onComplete回调都执行完毕

vpt*_*ron 8 scala future

我正在使用FutureScala 2.10.X中的API.

这是我的用例:

object Class1 {

  def apply(f: (Int) => Future[String])(i: Int): Future[String] = {

    val start = DateTime.now

    val result = f(i)

    result.onComplete{
      case _ => println("Started at " + start + ", ended at " + DateTime.now)
    }

    result
  }
}
Run Code Online (Sandbox Code Playgroud)

我觉得很简单:我正在为我的未来添加一个onComplete回调.现在,我想知道是否有办法在onComplete完成执行时添加回调 - 在这个例子中知道记录何时完成.

假设我的result实例有3个onComplete已注册,我可以知道所有这些实例都被执行了吗?我不认为这是可能的,但谁知道:)

也许另一种方法是调用map而不是onComplete返回一个新的实例Future:

def apply(f: (Int) => Future[String])(i: Int): Future[String] = {

  val start = DateTime.now

  f(i) map {
    case r => 
      println("Started at " + start + ", ended at " + DateTime.now)
      r
  }
}
Run Code Online (Sandbox Code Playgroud)

但我不确定它会保持相同的行为.

编辑:只是为了澄清 - 只有一个实例Future,我onComplete同一个实例上调用了3次(好吧,在我的例子中只有一次,但是让我说我​​称之为N次)我想知道什么时候3由于完成相同的Future实例,回调已完成执行.

som*_*ytt 12

如果您不想使用其他方法(如CountDownLatch),那么您希望andThen用来了解您的操作何时完成(成功与否,以及Future是否成功).

scala> val f = Future(3)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@4b49ca35

scala> val g = f andThen { case Success(i) => println(i) } andThen { case _ => println("All done") }
3
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@1939e13
All done
Run Code Online (Sandbox Code Playgroud)

如果将来失败,则不会调用映射的对比函数:

scala> val f = Future[Int](???)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@7001619b

scala> val g = f andThen { case t => println(s"stage 1 $t") } andThen { case _ => println("All done") }
stage 1 Failure(java.util.concurrent.ExecutionException: Boxed Error)
All done
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@24e1e7e8

scala> val g = f map { case i => println(i) } andThen { case _ => println("All done") }
All done
g: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@5d0f75d6

scala> val g = f map { case i => println(i) } map { case _ => println("All done") }
g: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@5aabe81f

scala> g.value
res1: Option[scala.util.Try[Unit]] = Some(Failure(java.util.concurrent.ExecutionException: Boxed Error))
Run Code Online (Sandbox Code Playgroud)

类似地,在链式处理程序中炸毁不会破坏后续操作:

scala> val g = f andThen { case t => null.hashCode } andThen { case _ => Thread.sleep(1000L); println("All done") }
java.lang.NullPointerException
    at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.applyOrElse(<console>:51)
    at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.applyOrElse(<console>:51)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:431)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:430)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@3fb7bec8

scala> All done


scala> g.value
res1: Option[scala.util.Try[Int]] = Some(Success(3))
Run Code Online (Sandbox Code Playgroud)

对于需要等待它的不幸情况:

scala> val f = Future[Int](???)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@859a977

scala> import java.util.concurrent.{ CountDownLatch => CDL }
import java.util.concurrent.{CountDownLatch=>CDL}

scala> val latch = new CDL(3)
latch: java.util.concurrent.CountDownLatch = java.util.concurrent.CountDownLatch@11683e9f[Count = 3]

scala> f onComplete { _ => println(1); latch.countDown() }
1

scala> f onComplete { _ => println(2); latch.countDown() }
2

scala> f onComplete { _ => println(3); latch.countDown() }
3

scala> f onComplete { _ => latch.await(); println("All done") }
All done
Run Code Online (Sandbox Code Playgroud)


whe*_*ies 5

1个Future和3个onComplete

我认为你将不得不将你的功能组合成一个onComplete电话,否则你必须按照你所说的去做,使用map:

 val fut1 = myFut map func1 // yes, a Future[Unit]
 val fut2 = myFut map func2
 val fut3 = myFut map func3
Run Code Online (Sandbox Code Playgroud)

按照下一节的说明,了解它们何时完成.

有3种不同的期货

很有可能知道三个人什么时候Future完成.事实上,在Scala Future作曲!

 def threeFutures(one: Future[Int], two: Future[Int], three: Future[Int]) {
   val fourth = for {
     _ <- one
     _ <- two
     _ <- three
   } yield 0

   fourth onComplete {
     case _ => println("all done")
   }
 }
Run Code Online (Sandbox Code Playgroud)

现在这是什么意思?这意味着fourth是一个Future不关心三个参数的输入,但是,当他们都完成后,将完成本身.这是预包装,专为您准备.

(旁注:在示例中,我还假设您在范围内有所有隐含.)