如何在scala中连续执行Futures

use*_*635 29 scala

我有这个场景,我需要使用一个迭代器,为每个项调用一个函数f(item)并返回一个Future[Unit].

但是,我需要让每个f(item)调用按顺序执行,它们不能并行运行.

for(item <- it)
  f(item)
Run Code Online (Sandbox Code Playgroud)

不会起作用,因为这会并行启动所有调用.

我该怎么做才能按顺序进行?

Gle*_*est 42

如果你不介意非常本地化var,你可以f(item)按如下方式序列化异步处理(每个)(flatMap序列化):

val fSerialized = {
  var fAccum = Future{()}
  for(item <- it) {
    println(s"Processing ${item}")
    fAccum = fAccum flatMap { _ => f(item) }
  }
  fAccum
}

fSerialized.onComplete{case resTry => println("All Done.")}
Run Code Online (Sandbox Code Playgroud)

一般来说,避免Await操作 - 它们阻止(有点失去异步点,消耗资源和草率设计,可能会死锁)


酷技巧1:

您可以Futures通过通常的嫌疑人链接在一起flatmap- 它序列化异步操作.它有什么不能做的吗?;-)

def f1 = Future { // some background running logic here...}
def f2 = Future { // other background running logic here...}

val fSerialized: Future[Unit] = f1 flatMap(res1 => f2)  

fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)}
Run Code Online (Sandbox Code Playgroud)

以上都没有 - 主线程在几十纳秒内直接运行.在所有情况下都使用Futures来执行并行线程并跟踪异步状态/结果和​​链逻辑.

fSerialized表示链接在一起的两个不同异步操作的组合.一旦评估了val,它就会立即启动f1(异步运行). f1像任何一样运行Future- 当它最终完成时,它会调用它的onComplete回调块.这是酷位 - flatMap将它的参数安装为f1onComplete回调块 - 因此f2一旦f1完成就会启动,没有阻塞,轮询或浪费资源使用.当f2完成后,再fSerialized完成了-所以它运行的fSerialized.onComplete回调块-打印"都做了".

不仅如此,您还可以使用整洁的非意大利面条代码尽可能多地链接平面地图

 f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ...
Run Code Online (Sandbox Code Playgroud)

如果您通过Future.onComplete执行此操作,则必须将连续操作嵌套为嵌套在onComplete图层上:

f1.onComplete{case res1Try => 
  f2
  f2.onComplete{case res2Try =>
    f3
    f3.onComplete{case res3Try =>
      f4
      f4.onComplete{ ...
      }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

不太好.

测试证明:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8))

fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
Run Code Online (Sandbox Code Playgroud)

酷技巧2:

对于这样的理解:

for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr
Run Code Online (Sandbox Code Playgroud)

对于这个来说,只不过是语法糖:

aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } }
Run Code Online (Sandbox Code Playgroud)

这是一个flatMaps链,然后是最终的地图.

这意味着

f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!")
Run Code Online (Sandbox Code Playgroud)

是完全相同的

for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!"
Run Code Online (Sandbox Code Playgroud)

测试证明(继上一次测试之后):

val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!"
fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
Run Code Online (Sandbox Code Playgroud)

不那么酷的技巧3:

不幸的是,你不能在同样的理解中混合迭代器和期货.编译错误:

val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last
Run Code Online (Sandbox Code Playgroud)

嵌套fors会带来挑战.以下不是序列化,而是并行运行异步块(嵌套的理解不会将后续的Futures与flatMap/Map链接,而是链为Iterable.flatMap {item => f(item)} - 不一样!)

val fSerial = {for {nextItem <- itemIterable} yield
                 for {nextRes <- f(nextItem)} yield "Did It"}.last
Run Code Online (Sandbox Code Playgroud)

另外使用foldLeft/foldRight加flatMap也不会像你期望的那样工作 - 似乎是一个bug /限制; 所有异步块都是并行处理的(因此Iterator.foldLeft/Right不适合Future.flatMap):

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
val empty = Future[Unit]{()}
def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2)

//val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))}
val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))}

fSerialized.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
Run Code Online (Sandbox Code Playgroud)

但这有效(涉及var):

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._

def f(item: Int): Future[Unit] = Future{
  print("Waiting " + item + " seconds ...")
  Console.flush
  blocking{Thread.sleep((item seconds).toMillis)}
  println("Done")
}

val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)

var fSerial = Future{()}
for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem)) 
Run Code Online (Sandbox Code Playgroud)

  • 使用foldLeft/foldRight加上flatMap*可以*按预期工作.通过例子`items.foldLeft(Future()){(x,y)=> x.flatMap {_ => f(y)}}`确切地说你为什么会这样做(连续执行`f(y)` `items`中的每个`y`.它在你的情况下不起作用的原因是你使用辅助方法`serialize`,其中右手未来(`f2`)按值传递,从而迫使其早期评估按名称传递`f2`应该修复它:`def serialize(f1:Future [Unit],f2:=> Future [Unit])`.当你在它的时候,你也可以通过名字传递`f1`太. (9认同)

win*_*ner 38

def seqFutures[T, U](items: TraversableOnce[T])(yourfunction: T => Future[U]): Future[List[U]] = {
  items.foldLeft(Future.successful[List[U]](Nil)) {
    (f, item) => f.flatMap {
      x => yourfunction(item).map(_ :: x)
    }
  } map (_.reverse)
}
Run Code Online (Sandbox Code Playgroud)

如果您按顺序运行,因为资源限制阻止一次运行多个Future,则创建和使用ExecutionContext仅包含单个线程的自定义可能更容易.

  • 为什么这不在标准 Scala API 中? (2认同)

Som*_*tik 8

另一种选择是使用Akka Streams:

val doneFuture = Source
  .fromIterator(() => it)
  .mapAsync(parallelism = 1)(f)
  .runForeach{identity}
Run Code Online (Sandbox Code Playgroud)