rad*_*pet 10 java multithreading java-stream
请提供给我参考,为什么使用Java Stream API的以下两个析因实现之间的执行时间会有显着差异:
我的期望是接近执行时间,但是并行版本的速度提高了2倍。我没有运行任何专门的基准测试,但是即使在冷启动的jvm中,执行时间也不应有太大差异。在下面,我附上两个实现的源代码:
public class FastFactorialSupplier implements FactorialSupplier {
private final ExecutorService executorService;
public FastFactorialSupplier(ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public BigInteger get(long k) {
try {
return executorService
.submit(
() -> LongStream.range(2, k + 1)
.parallel()
.mapToObj(BigInteger::valueOf)
.reduce(BigInteger.ONE, (current, factSoFar) -> factSoFar.multiply(current))
)
.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return BigInteger.ZERO;
}
}
Run Code Online (Sandbox Code Playgroud)
public class MathUtils {
public static BigInteger factorial(long k) {
return LongStream.range(2, k + 1)
.mapToObj(BigInteger::valueOf)
.reduce(BigInteger.ONE, (current, factSoFar) -> factSoFar.multiply(current));
}
}
Run Code Online (Sandbox Code Playgroud)
这是基于intellij junit运行器显示的带有附加的示例执行时间的测试用例作为注释。
@Test
public void testWithoutParallel() {
//2s 403
runTest(new DummyFactorialSupplier()); // uses MathUtils.factorial
}
@Test
public void testParallelismWorkStealing1() {
//1s 43
runTest(new FastFactorialSupplier(Executors.newWorkStealingPool(1)));
}
@Test
public void testParallelismForkJoin1() {
// 711ms
runTest(new FastFactorialSupplier(new ForkJoinPool(1)));
}
@Test
public void testExecutorForkJoin() {
//85ms
runTest(new FastFactorialSupplier(new ForkJoinPool()));
}
private void runTest(FactorialSupplier factorialSupplier) {
BigInteger result = factorialSupplier.get(100000);
assertNotNull(result);
// assertEquals(456574, result.toString().length());
}
Run Code Online (Sandbox Code Playgroud)
用java 11日以来出现了在Java 8定制一个问题叉连接池测试运行- https://bugs.openjdk.java.net/browse/JDK-8190974
是否可以进行与伪并行处理有关的优化以及执行的调度方式,而鉴于执行纯粹是顺序执行,则没有这样的优化吗?
编辑:
我也使用jmh运行microbenchmark
平行:
public class FastFactorialSupplierP1Test {
@Benchmark
@BenchmarkMode({Mode.AverageTime, Mode.SampleTime, Mode.SingleShotTime, Mode.Throughput, Mode.All})
@Fork(value = 1, warmups = 1)
public void measure() {
runTest(new FastFactorialSupplier(new ForkJoinPool(1)));
}
private void runTest(FactorialSupplier factorialSupplier) {
BigInteger result = factorialSupplier.get(100000);
assertNotNull(result);
}
public static void main(String[] args) throws Exception {
org.openjdk.jmh.Main.main(args);
}
}
Run Code Online (Sandbox Code Playgroud)
序列号:
public class SerialFactorialSupplierTest {
@Benchmark
@BenchmarkMode({Mode.AverageTime, Mode.SampleTime, Mode.SingleShotTime, Mode.Throughput, Mode.All})
@Fork(value = 1, warmups = 1)
public void measure() {
runTest(new DummyFactorialSupplier());
}
private void runTest(FactorialSupplier factorialSupplier) {
BigInteger result = factorialSupplier.get(100000);
assertNotNull(result);
}
public static void main(String[] args) throws Exception {
org.openjdk.jmh.Main.main(args);
}
}
Run Code Online (Sandbox Code Playgroud)
public class IterativeFactorialTest {
@Benchmark
@BenchmarkMode({Mode.AverageTime, Mode.SampleTime, Mode.SingleShotTime, Mode.Throughput, Mode.All})
@Fork(value = 1, warmups = 1)
public void measure() {
runTest(new IterativeFact());
}
private void runTest(FactorialSupplier factorialSupplier) {
BigInteger result = factorialSupplier.get(100000);
assertNotNull(result);
}
public static void main(String[] args) throws Exception {
org.openjdk.jmh.Main.main(args);
}
class IterativeFact implements FactorialSupplier {
@Override
public BigInteger get(long k) {
BigInteger result = BigInteger.ONE;
while (k-- != 0) {
result = result.multiply(BigInteger.valueOf(k));
}
return result;
}
}
}
Run Code Online (Sandbox Code Playgroud)
结果:
FastFactorialSupplierP1Test.measure avgt 5 0.437 ± 0.006 s/op
IterativeFactorialTest.measure avgt 5 2.643 ± 0.383 s/op
SerialFactorialSupplierTest.measure avgt 5 2.226 ± 0.044 s/op
Run Code Online (Sandbox Code Playgroud)
您选择了一个性能取决于评估顺序的操作。仅考虑性能BigInteger.multiply取决于两个因素的大小。然后,遍历一系列BigInteger实例,这些实例的累加值是下次乘法的一个因素,这会使操作变得越来越昂贵,而且距离您越远。
相反,将值范围分成较小的范围,对每个范围分别进行乘法,然后将范围的结果相乘,即使没有同时评估这些子范围,也可以获得性能优势。
因此,当并行流将工作拆分为多个块,以供其他工作线程潜在地使用,但最终在同一线程中对其进行评估时,由于更改了评估顺序,因此在此特定设置中,您仍然可以获得性能上的提高。
我们可以通过删除所有与Stream和线程池相关的工件来进行测试:
public static BigInteger multiplyAll(long from, long to, int split) {
if(split < 1 || to - from < 2) return serial(from, to);
split--;
long middle = (from + to) >>> 1;
return multiplyAll(from, middle, split).multiply(multiplyAll(middle, to, split));
}
private static BigInteger serial(long l1, long l2) {
BigInteger bi = BigInteger.valueOf(l1++);
for(; l1 < l2; l1++) {
bi = bi.multiply(BigInteger.valueOf(l1));
}
return bi;
}
Run Code Online (Sandbox Code Playgroud)
我没有手头的JMH设置来发布压力较大的结果,但是一个简单的运行显示,数量级与您的结果相符,仅一个分割就已经将评估时间大致减少了一半,而更高的数字仍然可以改善性能,尽管曲线变得讨人喜欢。
如中所述ForkJoinTask.html#getSurplusQueuedTaskCount(),这是一种合理的策略,将工作分割成每个工作人员还有一些其他任务,可能由其他线程承担,这可能补偿不平衡的工作负载,例如,如果某些元素的处理成本比其他元素便宜。显然,并行流没有用于处理没有其他工作线程的情况的特殊代码,因此,即使只有一个线程来处理它,您也可以看到拆分工作的效果。
| 归档时间: |
|
| 查看次数: |
201 次 |
| 最近记录: |