clo*_*g14 5 java parallel-processing scala java.util.concurrent scala-repl
我在学习Paul Chiusano和Runar Bjanarson的著作“ Scala中的函数编程”(第7章-纯函数并行性)时遇到了以下情况。
package fpinscala.parallelism
import java.util.concurrent._
import language.implicitConversions
object Par {
type Par[A] = ExecutorService => Future[A]
def run[A](s: ExecutorService)(a: Par[A]): Future[A] = a(s)
def unit[A](a: A): Par[A] = (es: ExecutorService) => UnitFuture(a) // `unit` is represented as a function that returns a `UnitFuture`, which is a simple implementation of `Future` that just wraps a constant value. It doesn't use the `ExecutorService` at all. It's always done and can't be cancelled. Its `get` method simply returns the value that we gave it.
private case class UnitFuture[A](get: A) extends Future[A] {
def isDone = true
def get(timeout: Long, units: TimeUnit) = get
def isCancelled = false
def cancel(evenIfRunning: Boolean): Boolean = false
}
def map2[A,B,C](a: Par[A], b: Par[B])(f: (A,B) => C): Par[C] = // `map2` doesn't evaluate the call to `f` in a separate logical thread, in accord with our design choice of having `fork` be the sole function in the API for controlling parallelism. We can always do `fork(map2(a,b)(f))` if we want the evaluation of `f` to occur in a separate thread.
(es: ExecutorService) => {
val af = a(es)
val bf = b(es)
UnitFuture(f(af.get, bf.get)) // This implementation of `map2` does _not_ respect timeouts. It simply passes the `ExecutorService` on to both `Par` values, waits for the results of the Futures `af` and `bf`, applies `f` to them, and wraps them in a `UnitFuture`. In order to respect timeouts, we'd need a new `Future` implementation that records the amount of time spent evaluating `af`, then subtracts that time from the available time allocated for evaluating `bf`.
}
def fork[A](a: => Par[A]): Par[A] = // This is the simplest and most natural implementation of `fork`, but there are some problems with it--for one, the outer `Callable` will block waiting for the "inner" task to complete. Since this blocking occupies a thread in our thread pool, or whatever resource backs the `ExecutorService`, this implies that we're losing out on some potential parallelism. Essentially, we're using two threads when one should suffice. This is a symptom of a more serious problem with the implementation, and we will discuss this later in the chapter.
es => es.submit(new Callable[A] {
def call = a(es).get
})
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
def equal[A](e: ExecutorService)(p: Par[A], p2: Par[A]): Boolean =
p(e).get == p2(e).get
}
Run Code Online (Sandbox Code Playgroud)
您可以在Github上找到原来的代码在这里。请参阅此处以获取java.util.concurrent文档。
我关注的实施fork。特别地,fork当ThreadPool太小时,据称可能导致死锁。
我考虑以下示例:
val a = Par.lazyUnit(42 + 1)
val es: ExecutorService = Executors.newFixedThreadPool(2)
println(Par.fork(a)(es).get)
Run Code Online (Sandbox Code Playgroud)
我不希望这个示例最终陷入僵局,因为有两个线程。但是,当我在Scala REPL中运行它时,它将在我的计算机上运行。为什么会这样呢?
初始化时的输出ExecutorService是es:java.util.concurrent.ExecutorService =
java.util.concurrent.ThreadPoolE
xecutor@73a86d72[Running, pool size = 0, active threads = 0, queued tasks =
0, completed tasks = 0]
Run Code Online (Sandbox Code Playgroud)
是pool size = 0正确的吗?换句话说,这是不理解java.util.concurrent._的问题还是不了解Scala部件的问题?
好吧,经过长时间的调查,我相信我有了答案。整个故事很长,但我会尝试通过简化和避免许多细节来缩短它。
注意:Scala 可能可以编译到各种不同的目标平台,但这个特定问题发生在作为目标的 Java/JVM 上,所以这就是这个答案的内容。
您看到的死锁与线程池的大小无关。实际上是外部fork调用挂起。它与 REPL 实现细节和多线程的组合相关,但需要学习一些知识才能理解它是如何发生的:
objects 编译为 Java/JVM一个简短的版本(另请参见最后的摘要)是该代码挂在 REPL 下,因为当它由 REPL 执行时,它在逻辑上类似于以下代码:
object DeadLock {
import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.concurrent.ExecutionContext.Implicits.global
val foo: Int = Await.result(Future(calc()), Duration.Inf)
def printFoo(): Unit = {
println(s"Foo = $foo")
}
private def calc(): Int = {
println("Before calc")
42
}
}
def test(): Unit = {
println("Before printFoo")
DeadLock.printFoo()
println("After printFoo")
}
Run Code Online (Sandbox Code Playgroud)
或者在 Java 世界中非常相似:
class Deadlock {
static CompletableFuture<Integer> cf;
static int foo;
public static void printFoo() {
System.out.println("Print foo " + foo);
}
static {
cf = new CompletableFuture<Integer>();
new Thread(new Runnable() {
@Override
public void run() {
calcF();
}
}).start();
try {
foo = cf.get();
System.out.println("Future result = " + cf.get());
} catch (InterruptedException e) {
e.printStackTrace();f
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private static void calcF() {
cf.complete(42);
}
}
public static void main(String[] args) {
System.out.println("Before foo");
Deadlock.printFoo();
System.out.println("After foo");
}
Run Code Online (Sandbox Code Playgroud)
如果您清楚为什么这段代码会死锁,那么您已经了解了大部分内容,并且可能可以自己推断出其余部分。您可能只需浏览一下最后的“摘要”部分即可。
Java 静态初始化器如何导致死锁?
让我们从这个故事的结尾开始:为什么Java代码会挂起?发生这种情况是因为 Java/JVM 对静态初始化程序有两个保证(更多详细信息,请参见12.4.2. JLS 的详细初始化过程部分):
静态初始化程序将在该类的任何其他“外部”使用之前运行
静态初始化程序将只运行一次,并且是通过全局锁定完成的
用于静态初始化程序的锁是隐式的,由 JVM 管理,但它确实存在。这意味着代码在逻辑上类似于这样:
class Deadlock {
static boolean staticInitFinished = false;
// unique value for each thread!
static ThreadLocal<Boolean> currentThreadRunsStaticInit = ThreadLocal.withInitial(() -> Boolean.FALSE);
static CompletableFuture<Integer> cf;
static int foo;
static void enforceStaticInit() {
synchronized (Deadlock.class) {
// is init finished?
if (staticInitFinished)
return;
// are we the thread already running the init?
if(currentThreadRunsStaticInit.get())
return;
currentThreadRunsStaticInit.set(true);
cf = new CompletableFuture<Integer>();
new Thread(new Runnable() {
@Override
public void run() {
calcF();
}
}).start();
try {
foo = cf.get();
System.out.println("Future result = " + cf.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
currentThreadRunsStaticInit.set(false);
staticInitFinished = true;
}
}
private static void calcF() {
enforceStaticInit();
cf.complete(42);
}
public static void printFoo() {
enforceStaticInit();
System.out.println("Print foo " + foo);
}
}
Run Code Online (Sandbox Code Playgroud)
现在很清楚为什么这段代码会死锁:我们的静态初始化程序启动一个新线程并阻止等待它的结果。但是这个新线程尝试访问同一个类(calcF方法),并且作为另一个线程,它必须等待已经运行的静态初始化程序完成。请注意,如果该calcF方法位于另一个类中,则一切都会正常工作。
Scala REPL 的工作原理
现在让我们回到故事的开头,了解 Scala REPL 的工作原理。这个答案是对实际情况的极大简化,但它抓住了这种情况的重要细节。对于 REPL 实现者来说幸运的是,Scala 编译器是用 Scala 编写的。这意味着 REPL 不必以某种方式解释代码,它可以通过标准编译器运行代码,然后通过 Java Reflection API 运行编译后的代码。这仍然需要对代码进行一些修饰才能让编译器满意并返回结果。
当你输入类似的内容时,稍微简化一下(或者说,很多)
class Deadlock {
static CompletableFuture<Integer> cf;
static int foo;
public static void printFoo() {
System.out.println("Print foo " + foo);
}
static {
cf = new CompletableFuture<Integer>();
new Thread(new Runnable() {
@Override
public void run() {
calcF();
}
}).start();
try {
foo = cf.get();
System.out.println("Future result = " + cf.get());
} catch (InterruptedException e) {
e.printStackTrace();f
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private static void calcF() {
cf.complete(42);
}
}
public static void main(String[] args) {
System.out.println("Before foo");
Deadlock.printFoo();
System.out.println("After foo");
}
Run Code Online (Sandbox Code Playgroud)
在 REPL 中,代码被分析并转换为如下所示:
class Deadlock {
static boolean staticInitFinished = false;
// unique value for each thread!
static ThreadLocal<Boolean> currentThreadRunsStaticInit = ThreadLocal.withInitial(() -> Boolean.FALSE);
static CompletableFuture<Integer> cf;
static int foo;
static void enforceStaticInit() {
synchronized (Deadlock.class) {
// is init finished?
if (staticInitFinished)
return;
// are we the thread already running the init?
if(currentThreadRunsStaticInit.get())
return;
currentThreadRunsStaticInit.set(true);
cf = new CompletableFuture<Integer>();
new Thread(new Runnable() {
@Override
public void run() {
calcF();
}
}).start();
try {
foo = cf.get();
System.out.println("Future result = " + cf.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
currentThreadRunsStaticInit.set(false);
staticInitFinished = true;
}
}
private static void calcF() {
enforceStaticInit();
cf.complete(42);
}
public static void printFoo() {
enforceStaticInit();
System.out.println("Print foo " + foo);
}
}
Run Code Online (Sandbox Code Playgroud)
然后line3.eval.print()通过反射调用。
类似的故事发生在:
val a = Par.lazyUnit(42 + 1)
Run Code Online (Sandbox Code Playgroud)
最后当你这样做时
Par.fork(a)(es).get
Run Code Online (Sandbox Code Playgroud)
事情变得更有趣了,因为您依赖于使用imports 巧妙实现的前几行:
package line3
object read {
val a = Par.lazyUnit(42 + 1)
val res3 = a
}
object eval {
def print() = {
println("a: Par.Par[Int] = " + read.res3)
}
}
Run Code Online (Sandbox Code Playgroud)
这里重要的是,您编写到 REPL 中的所有内容都被包装成全新的object,然后像平常的代码一样编译和运行。
Scala 如何在 Java/JVM 上模拟按名称参数
该方法的定义fork使用按名称参数:
def fork[A](a: => Par[A]): Par[A] =
Run Code Online (Sandbox Code Playgroud)
这里它用于a延迟评估,这对于 的整个逻辑至关重要fork。Java/JVM 没有对惰性求值的标准支持,但它可以被模拟,这就是 Scala 编译器所做的。在内部,签名更改为使用Function0:
val es: ExecutorService = Executors.newFixedThreadPool(2)
Run Code Online (Sandbox Code Playgroud)
并且对 的每次访问都a被替换为对 的调用aWrapper.apply()。魔法的另一部分发生在带有按名称参数的方法的调用方:那里的参数也应该被包装到一个Function0这样的代码中:
Par.fork(a)(es).get
Run Code Online (Sandbox Code Playgroud)
但实际上有点不同。天真地,仅仅为了这个小函数就需要另一个类,对于如此简单的逻辑来说这感觉很浪费。在 Scala 2.12 的实践中,使用了 Java 8 LambdaMetafactory的魔力,因此代码实际上变得像这样
package line5
object read {
import line2.read.Par
import line3.read.a
import line4.read.es
val res5 = Par.fork(a)(es).get
}
object eval {
def print() = {
println("res5: Int = " + read.res5)
}
}
Run Code Online (Sandbox Code Playgroud)
其中aWrapper _表示将方法转换为Funciton0使用LambdaMetafactory. 正如您可能从有关 Java 静态初始化器死锁的章节中所怀疑的那样,引入def aWrapper是一个至关重要的区别。您已经可以看到此代码与挂起的答案中的第一个 Scala 代码段非常相似。
objectScala 如何在 Java/JVM 上编译
最后一个难题是 Scala 如何object在 Java/JVM 中编译。嗯,它实际上被编译为类似于“静态类”的东西,但由于您可以用作object对象参数,所以它必须更复杂一些。实际上,所有初始化逻辑都转移到类的构造函数中object,并且有一个简单的静态初始化程序来调用它。所以我们read在 Java 中的最后一个对象(忽略imports)看起来像这样:
class read$ {
static read$ MODULE$
static {
new read$()
}
private Par[Int] res5;
private read$() {
MODULE$ = this;
res5 = Par.fork(read$::aWrapper)(es).get
}
private static int aWrapper(){
return line3.read$.MODULE$.a;
}
}
Run Code Online (Sandbox Code Playgroud)
这里再次read$::aWrapper表示使用 的方法构建Function0表单。换句话说,Scala 的初始化被转换为作为Java 静态初始化程序的一部分运行的代码。aWrapperLambdaMetafactoryobject
概括
总结一下事情是如何搞砸的:
REPL 将您的代码转换为每一行的新代码object并编译它
object初始化逻辑被翻译成Java静态初始化逻辑
在简单情况下,使用按名称参数调用方法会被转换为包装“返回值”逻辑的方法,并且该方法会添加到相同class或object
Par.fork作为初始化的一部分object(即 Java 静态初始化程序的一部分)执行,尝试在不同的线程上评估按名称参数(即调用同一类上的方法),并阻塞等待该线程的结果
Java 静态初始化程序在逻辑上是在全局锁下执行的,因此它会阻止调用该方法的不同线程。但它本身被阻止等待该方法调用完成。
| 归档时间: |
|
| 查看次数: |
148 次 |
| 最近记录: |