Spray.io:何时(不)使用非阻塞路由处理?

lol*_*ski 2 scala akka spray

如果我们考虑生产级REST API,我们应该尽可能多地使用非阻塞,例如

def insertDbAsync(rows: RowList): Future[Unit] = ...
...
val route =
path("database" / "insertRowList") {
  post {
    entity(as[RowList]) { rows =>
      log.info(s"${rows.length} rows received")
      val async = insertDbAsync(rows)
      onComplete(async) {
        case Success(response) =>
          complete("success")
        case Failure(t) =>
          complete("error")
      }
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

我认为答案很可能是'是',但在决定什么应该和不应该是阻止代码时有哪些指导原则,以及为什么?

dk1*_*k14 5

Spray使用Akka作为底层平台,因此建议与演员相同(阻止需要谨慎管理).阻止代码可能需要太多线程,这可能:

  • 杀死actor的轻量级:默认情况下,数百万个actor可能在一个线程上运行.假设一个非阻塞的actor需要0.001个线程.一个被阻止的演员(阻塞时间比平常多100倍)将占用1个线程平均值(并不总是相同的线程).首先,你拥有的线程越多 - 你释放的内存越多 - 每个被阻塞的线程都会在阻塞之前保存完整的callstack,包括来自堆栈的引用(因此GC不能删除它们).其次,如果你有多个number_of_processors线程 - 你将失去性能.第三,如果你使用一些动态池 - 添加新线程可能需要一些大量的时间.

  • 导致线程饥饿 - 你可能在池中填充了线程,这些线程无效 - 因此在阻塞操作完成之前无法处理新任务(0%CPU负载,但等待处理100500条消息).它甚至可能导致死锁.但是,Akka默认使用Fork-Join-Pool,所以如果您的阻止代码被管理(周围有scala.concurrent.blocking- 周围Await.result有这样的内部) - 它将通过创建新线程而不是阻塞的线程来防止饥饿,但它不会补偿其他问题.

  • 传统上会导致死锁,所以这对设计来说很糟糕

如果代码在外部阻止,您可以将其包围在未来:

 import scala.concurrent._
 val f = Future {
     someNonBlockingCode()
     blocking { //mark this thread as "blocked" so fork-join-pool may create another one to compensate
        someBlocking()
     }  
 }
Run Code Online (Sandbox Code Playgroud)

在单独的演员内:

 f pipeTo sender //will send the result to `sender` actor
Run Code Online (Sandbox Code Playgroud)

内部喷涂路线:

 onComplete(f) { .. }
Run Code Online (Sandbox Code Playgroud)

最好在单独的池/调度程序(基于fork-join-pool)中执行这样的期货.

PS作为期货的替代品(它们可能不太容易从设计上看)你可以考虑Akka I/O,Continuations/Coroutines,Actor (也在单独的调度员中),Disruptor等.