Scala Future/Promise快速失败管道

rog*_*one 5 multithreading scala promise

我希望并行启动两个或更多的Future/Promise,即使其中一个已启动的Future/Promise失败并且不想等待其余的完成,也会失败.在Scala中组合此管道的最惯用方法是什么.

编辑:更多上下文信息.

我必须启动两个外部进程,一个写入fifo文件,另一个从中读取.假设作者进程失败; 读者线程可能永远挂起等待文件的任何输入.因此,我希望并行启动这两个进程并快速失败,即使其中一个Future/Promise失败而没有等待另一个进程完成.

下面是更准确的示例代码.命令并不完全cattail.我使用它们是为了简洁.

val future1 = Future { executeShellCommand("cat file.txt > fifo.pipe") }
val future2 = Future { executeShellCommand("tail fifo.pipe") }
Run Code Online (Sandbox Code Playgroud)

maa*_*asg 6

如果我正确地理解了这个问题,我们正在寻找的是一个快速失败的序列实现,它类似于一个失败偏向的版本 firstCompletedOf

在这里,我们急切地注册了一个失败回调,以防一个期货早期失败,确保我们一旦期货失败就失败.

import scala.concurrent.{Future, Promise}
import scala.util.{Success, Failure}
import scala.concurrent.ExecutionContext.Implicits.global
def failFast[T](futures: Seq[Future[T]]): Future[Seq[T]] = {
  val promise = Promise[Seq[T]]
  futures.foreach{f => f.onFailure{case ex => promise.failure(ex)}}
  val res = Future.sequence(futures)
  promise.completeWith(res).future
}
Run Code Online (Sandbox Code Playgroud)

相比之下,无论订单如何,Future.sequence只要任何期货失败,此实施将失败.让我们用一个例子来说明:

import scala.util.Try
// help method to measure time
def resilientTime[T](t: =>T):(Try[T], Long) = {
  val t0 = System.currentTimeMillis
  val res = Try(t)
  (res, System.currentTimeMillis-t0)
}

import scala.concurrent.duration._
import scala.concurrent.Await
Run Code Online (Sandbox Code Playgroud)

第一个未来将失败(2秒内失败)

val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val res = failFast(Seq(f1,f2,f3))

resilientTime(Await.result(res, 10.seconds))
// res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998)
Run Code Online (Sandbox Code Playgroud)

最后的未来将失败.失败也在2秒内.(注意序列构造中的顺序)

val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val res = failFast(Seq(f3,f2,f1))

resilientTime(Await.result(res, 10.seconds))
// res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998)
Run Code Online (Sandbox Code Playgroud)

Future.sequence失败取决于排序(10秒内失败)的比较:

val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val seq = Seq(f3,f2,f1)

resilientTime(Await.result(Future.sequence(seq), 10.seconds))
//res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),10000)
Run Code Online (Sandbox Code Playgroud)