rog*_*one 5 multithreading scala promise
我希望并行启动两个或更多的Future/Promise,即使其中一个已启动的Future/Promise失败并且不想等待其余的完成,也会失败.在Scala中组合此管道的最惯用方法是什么.
编辑:更多上下文信息.
我必须启动两个外部进程,一个写入fifo文件,另一个从中读取.假设作者进程失败; 读者线程可能永远挂起等待文件的任何输入.因此,我希望并行启动这两个进程并快速失败,即使其中一个Future/Promise失败而没有等待另一个进程完成.
下面是更准确的示例代码.命令并不完全cat和tail.我使用它们是为了简洁.
val future1 = Future { executeShellCommand("cat file.txt > fifo.pipe") }
val future2 = Future { executeShellCommand("tail fifo.pipe") }
Run Code Online (Sandbox Code Playgroud)
如果我正确地理解了这个问题,我们正在寻找的是一个快速失败的序列实现,它类似于一个失败偏向的版本 firstCompletedOf
在这里,我们急切地注册了一个失败回调,以防一个期货早期失败,确保我们一旦期货失败就失败.
import scala.concurrent.{Future, Promise}
import scala.util.{Success, Failure}
import scala.concurrent.ExecutionContext.Implicits.global
def failFast[T](futures: Seq[Future[T]]): Future[Seq[T]] = {
val promise = Promise[Seq[T]]
futures.foreach{f => f.onFailure{case ex => promise.failure(ex)}}
val res = Future.sequence(futures)
promise.completeWith(res).future
}
Run Code Online (Sandbox Code Playgroud)
相比之下,无论订单如何,Future.sequence只要任何期货失败,此实施将失败.让我们用一个例子来说明:
import scala.util.Try
// help method to measure time
def resilientTime[T](t: =>T):(Try[T], Long) = {
val t0 = System.currentTimeMillis
val res = Try(t)
(res, System.currentTimeMillis-t0)
}
import scala.concurrent.duration._
import scala.concurrent.Await
Run Code Online (Sandbox Code Playgroud)
第一个未来将失败(2秒内失败)
val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val res = failFast(Seq(f1,f2,f3))
resilientTime(Await.result(res, 10.seconds))
// res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998)
Run Code Online (Sandbox Code Playgroud)
最后的未来将失败.失败也在2秒内.(注意序列构造中的顺序)
val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val res = failFast(Seq(f3,f2,f1))
resilientTime(Await.result(res, 10.seconds))
// res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),1998)
Run Code Online (Sandbox Code Playgroud)
与Future.sequence失败取决于排序(10秒内失败)的比较:
val f1 = Future[Int]{Thread.sleep(2000); throw new Exception("boom")}
val f2 = Future[Int]{Thread.sleep(5000); 42}
val f3 = Future[Int]{Thread.sleep(10000); 101}
val seq = Seq(f3,f2,f1)
resilientTime(Await.result(Future.sequence(seq), 10.seconds))
//res: (scala.util.Try[Seq[Int]], Long) = (Failure(java.lang.Exception: boom),10000)
Run Code Online (Sandbox Code Playgroud)