Stream.reduce()和Stream.collect()之间令人惊讶的性能差异

Udo*_*Udo 3 java performance lambda java-8 java-stream

我想比较两个Java8流终端操作reduce()以及collect()它们的并行性能.

我们来看看下面的Java8并行流示例:

import java.math.BigInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static java.math.BigInteger.ONE;

public class StartMe {

    static Function<Long, BigInteger> fac;

    static {
        fac = x -> x==0? ONE : BigInteger.valueOf(x).multiply(fac.apply(x - 1));
    }

    static long N = 2000;

    static Supplier<BigInteger[]> one() {
        BigInteger[] result = new BigInteger[1];
        result[0] = ONE;
        return () -> result;
    }

    static BiConsumer<BigInteger[], ? super BigInteger> accumulator() {
        return (BigInteger[] ba, BigInteger b) -> {
            synchronized (fac) {
                ba[0] = ba[0].multiply(b);
            }
        };
    }

    static BiConsumer<BigInteger[], BigInteger[]> combiner() {
        return (BigInteger[] b1, BigInteger[] b2) -> {};
    }

    public static void main(String[] args) throws Exception {
        long t0 = System.currentTimeMillis();

        BigInteger result1 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N).reduce(ONE, BigInteger::multiply);
        long t1 = System.currentTimeMillis();

        BigInteger[] result2 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N).collect(one(), accumulator(), combiner());
        long t2 = System.currentTimeMillis();

        BigInteger result3 = fac.apply(N);
        long t3 = System.currentTimeMillis();

        System.out.println("reduce():  deltaT = " + (t1-t0) + "ms, result 1 = " + result1);
        System.out.println("collect(): deltaT = " + (t2-t1) + "ms, result 2 = " + result2[0]);
        System.out.println("recursive: deltaT = " + (t3-t2) + "ms, result 3 = " + result3);

    }
}
Run Code Online (Sandbox Code Playgroud)

它计算n!使用一些 - 不可否认奇怪;-) - 算法.

然而,性能结果令人惊讶:

 reduce():  deltaT = 44ms, result 1 = 3316275...
 collect(): deltaT = 22ms, result 2 = 3316275...
 recursive: deltaT = 11ms, result 3 = 3316275...
Run Code Online (Sandbox Code Playgroud)

一些评论:

  • 我必须同步accumulator()它,因为它并行访问同一个数组.
  • 我期望reduce()并且collect()会产生相同的性能,但reduce()速度要慢约2倍collect(),即使collect()必须同步!
  • 最快的算法是顺序和递归算法(可能显示并行流管理的巨大开销)

我没想到reduce()表现会比collect()一个人差.为什么会这样?

Hol*_*ger 8

基本上,您正在测量第一次执行的代码的初始开销.不仅优化器还没有任何工作,您正在测量加载,验证和初始化类的开销.

所以毫无疑问,评估时间会减少,因为每个评估都可以重复使用已经为之前评估加载的类.在一个循环中运行所有三个评估,甚至只是更改顺序将给你一个完全不同的图片.

唯一可预测的结果是简单的递归评估将具有最小的初始开销,因为它不需要加载StreamAPI类.


如果您多次或更好地运行代码,请使用复杂的基准测试工具,我猜您会得到与我类似的结果,其中reduce明显优于collect单线程方法并且确实比单线程方法更快.

原因collect是因为你使用它完全错误.的Supplier将被查询的每个线程获得不同的容器中,因此,蓄压器函数并需要任何额外的同步.但重要的是组合器函数能够正确地将不同线程的结果容器连接到单个结果中.

一个正确的方法是:

BigInteger[] result2 = Stream.iterate(ONE, x -> x.add(ONE)).parallel().limit(N)
  .collect(()->new BigInteger[]{ONE},
           (a,v)->a[0]=a[0].multiply(v), (a,b)->a[0]=a[0].multiply(b[0]));
Run Code Online (Sandbox Code Playgroud)

在我的系统上,它的性能与reduce方法相当.由于使用数组作为可变容器不能改变不可变的性质BigInteger,因此在collect这里使用没有任何优势,使用reduce是直接的,并且正如所述,当正确使用两种方法时具有相同的性能.


顺便说一句,我不明白为什么这么多程序员试图创建自引用的lambda表达式.递归函数的直接方式仍然是一种方法:

static BigInteger fac(long x) {
    return x==0? ONE : BigInteger.valueOf(x).multiply(fac(x - 1));
}
static final Function<Long, BigInteger> fac=StartMe::fac;
Run Code Online (Sandbox Code Playgroud)

(虽然在你的代码中,你根本不需要它Function<Long, BigInteger>,只需fac(long)直接调用).


最后一点,无论是Stream.iterateStream.limit,是并行执行非常糟糕.使用具有可预测大小和独立操作的流将显着优于您的解决方案:

BigInteger result4 = LongStream.rangeClosed(1, N).parallel()
    .mapToObj(BigInteger::valueOf).reduce(BigInteger::multiply).orElse(ONE);
Run Code Online (Sandbox Code Playgroud)