为什么此Scala代码在一个线程中执行两个Future?

Wei*_*Lin 9 concurrency multithreading scala

我已经使用多个线程很长时间了,但无法解释这种简单的情况。

import java.util.concurrent.Executors
import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))

def addOne(x: Int) = Future(x + 1)
def addTwo(x: Int) = Future {addOne(x + 1)}

addTwo(1)
// res5: Future[Future[Int]] = Future(Success(Future(Success(3))))
Run Code Online (Sandbox Code Playgroud)

令我惊讶的是,它有效。而且我不知道为什么。

问题:
为什么给定一个线程可以同时执行两个Future?

我的期望
第一个FutureaddTwo)占用一个唯一的线程(newFixedThreadPool(1)),然后调用另一个FutureaddOne),这又需要另一个线程。
因此,程序最终将因线程不足而陷入困境。

Har*_*ebe 9

您的代码起作用的原因是,两个期货都将由同一线程执行。在ExecutionContext您创建将不使用Thread直接针对每个Future而是将计划任务(Runnable实例)执行。如果池中没有更多线程可用,这些任务将被BlockingQueue等待执行。(有关详细信息,请参见ThreadPoolExecutor API

如果看一下实现,Executors.newFixedThreadPool(1)您会看到创建了一个具有无限队列的执行器:

new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable])
Run Code Online (Sandbox Code Playgroud)

为了获得所需的线程匮乏的效果,您可以自己创建一个具有有限队列的执行程序:

 implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L, 
                     TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))
Run Code Online (Sandbox Code Playgroud)

由于最小容量ArrayBlockingQueue为1,因此您需要三个期货才能达到上限,并且还需要添加一些代码以根据期货的结果执行,以防止它们完成(在下面的示例中,我通过添加.map(identity)

下面的例子

import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L, 
                      TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))

def addOne(x: Int) = Future {
  x + 1
}
def addTwo(x: Int) = Future {
  addOne(x + 1) .map(identity)
}
def addThree(x: Int) = Future {
  addTwo(x + 1).map(identity)
}

println(addThree(1))
Run Code Online (Sandbox Code Playgroud)

失败于

java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@65a264b6 rejected from java.util.concurrent.ThreadPoolExecutor@10d078f4[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 1]
Run Code Online (Sandbox Code Playgroud)