并行流调用 Spliterator 的次数超过其限制

Pr0*_*ean 4 java java-8 java-stream spliterator

我最近发现了一个错误

StreamSupport.intStream(/* a Spliterator.ofInt */, true)
    .limit(20)
Run Code Online (Sandbox Code Playgroud)

被调用Spliterator.ofInt.tryAdvance超过20次。当我把它改成

StreamSupport.intStream(/* a Spliterator.ofInt */, true)
    .sequential()
    .limit(20)
Run Code Online (Sandbox Code Playgroud)

问题就消失了。为什么会出现这种情况?除了将并行流构建到 中之外,是否有任何方法可以在tryAdvance有副作用时实现对并行流的严格限制Spliterator?(这是为了测试一些返回无限流的方法,但测试需要达到最终结束,而不需要“循环 X 毫秒”构造的复杂性。)

Hol*_*ger 5

limit关于如何互动以及应该互动似乎存在根本性的误解trySplit。调用次数不应超过trySplit指定次数的假设limit是完全错误的。

\n\n

的目的trySplit是将源数据分为两部分,在最好的情况下分成两半,就像尝试trySplit平衡拆分一样。因此,如果您有一个包含 100 万个元素的源数据集,则成功的拆分会生成两个分别包含 50 万个元素的源数据集。这与limit(20)您可能应用于流的 a 完全无关,除了我们事先知道,如果 spliterator 具有特征,我们可以删除第二个数据集,SIZED|SUBSIZED因为请求的20 个元素只能在第一个数据集中找到五十万。

\n\n

\xe2\x80\x99 很容易计算,在最好的情况下,即平衡分割,我们已经需要十五次分割操作,每次都删除上半部分,然后才能在前二十个元素之间进行分割,从而允许我们并行处理前二十个元素。

\n\n

这可以很容易地证明:

\n\n
class DebugSpliterator extends Spliterators.AbstractIntSpliterator {\n    int current, fence;\n    DebugSpliterator() {\n        this(0, 1_000_000);\n    }\n    DebugSpliterator(int start, int end) {\n        super(end-start, ORDERED|SIZED|SUBSIZED);\n        current = start;\n        fence = end;\n    }\n    @Override public boolean tryAdvance(IntConsumer action) {\n        if(current<fence) {\n            action.accept(current++);\n            return true;\n        }\n        return false;\n    }\n    @Override public OfInt trySplit() {\n        int mid = (current+fence)>>>1;\n        System.out.println("trySplit() ["+current+", "+mid+", "+fence+"]");\n        return mid>current? new DebugSpliterator(current, current=mid): null;\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n\n\n
StreamSupport.stream(new DebugSpliterator(), true)\n    .limit(20)\n    .forEach(x -> {});\n
Run Code Online (Sandbox Code Playgroud)\n\n

在我的机器上,它打印:

\n\n
class DebugSpliterator extends Spliterators.AbstractIntSpliterator {\n    int current, fence;\n    DebugSpliterator() {\n        this(0, 1_000_000);\n    }\n    DebugSpliterator(int start, int end) {\n        super(end-start, ORDERED|SIZED|SUBSIZED);\n        current = start;\n        fence = end;\n    }\n    @Override public boolean tryAdvance(IntConsumer action) {\n        if(current<fence) {\n            action.accept(current++);\n            return true;\n        }\n        return false;\n    }\n    @Override public OfInt trySplit() {\n        int mid = (current+fence)>>>1;\n        System.out.println("trySplit() ["+current+", "+mid+", "+fence+"]");\n        return mid>current? new DebugSpliterator(current, current=mid): null;\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

当然,这远远超过了二十次分割尝试,但完全合理,因为必须对数据集进行分割,直到我们在所需目标范围内获得子范围,以便能够并行处理它。

\n\n

我们可以通过删除导致此执行策略的元信息来强制执行不同的行为:

\n\n
StreamSupport.stream(new DebugSpliterator(), true)\n    .filter(x -> true)\n    .limit(20)\n    .forEach(x -> {});\n
Run Code Online (Sandbox Code Playgroud)\n\n

由于 Stream API 不了解谓词\xe2\x80\x99s 的行为,因此管道失去了其SIZED特性,导致

\n\n
StreamSupport.stream(new DebugSpliterator(), true)\n    .limit(20)\n    .forEach(x -> {});\n
Run Code Online (Sandbox Code Playgroud)\n\n

这显示trySplit调用较少,但没有改进;查看数字表明,现在结果元素范围之外的范围(如果我们使用我们的知识,所有元素都会通过过滤器)都被处理,更糟糕的是,结果元素的范围完全被单个分割器覆盖,导致没有并行根本不处理我们的结果元素,所有其他线程都在处理随后被丢弃的元素。

\n\n

当然,我们可以通过改变来轻松地对我们的任务执行最佳分割

\n\n
int mid = (current+fence)>>>1;\n
Run Code Online (Sandbox Code Playgroud)\n\n

\n\n
int mid = fence>20? 20: (current+fence)>>>1;\n
Run Code Online (Sandbox Code Playgroud)\n\n

所以

\n\n
StreamSupport.stream(new DebugSpliterator(), true)\n    .limit(20)\n    .forEach(x -> {});\n
Run Code Online (Sandbox Code Playgroud)\n\n

结果是

\n\n
trySplit() [0, 500000, 1000000]\ntrySplit() [0, 250000, 500000]\ntrySplit() [0, 125000, 250000]\ntrySplit() [0, 62500, 125000]\ntrySplit() [0, 31250, 62500]\ntrySplit() [0, 15625, 31250]\ntrySplit() [0, 7812, 15625]\ntrySplit() [0, 3906, 7812]\ntrySplit() [0, 1953, 3906]\ntrySplit() [0, 976, 1953]\ntrySplit() [0, 488, 976]\ntrySplit() [0, 244, 488]\ntrySplit() [0, 122, 244]\ntrySplit() [0, 61, 122]\ntrySplit() [0, 30, 61]\ntrySplit() [0, 15, 30]\ntrySplit() [15, 22, 30]\ntrySplit() [15, 18, 22]\ntrySplit() [15, 16, 18]\ntrySplit() [16, 17, 18]\ntrySplit() [0, 7, 15]\ntrySplit() [18, 20, 22]\ntrySplit() [18, 19, 20]\ntrySplit() [7, 11, 15]\ntrySplit() [0, 3, 7]\ntrySplit() [3, 5, 7]\ntrySplit() [3, 4, 5]\ntrySplit() [7, 9, 11]\ntrySplit() [4, 4, 5]\ntrySplit() [9, 10, 11]\ntrySplit() [11, 13, 15]\ntrySplit() [0, 1, 3]\ntrySplit() [13, 14, 15]\ntrySplit() [7, 8, 9]\ntrySplit() [1, 2, 3]\ntrySplit() [8, 8, 9]\ntrySplit() [5, 6, 7]\ntrySplit() [14, 14, 15]\ntrySplit() [17, 17, 18]\ntrySplit() [11, 12, 13]\ntrySplit() [12, 12, 13]\ntrySplit() [2, 2, 3]\ntrySplit() [10, 10, 11]\ntrySplit() [6, 6, 7]\n
Run Code Online (Sandbox Code Playgroud)\n\n

但这\xe2\x80\x99不是一个通用的分裂器,而是一个如果限制不是20的话性能很差的分裂器。

\n\n

如果我们可以将限制合并到 spliterator 中,或者更一般地说,合并到流源中,那么我们就不会遇到这个问题。因此list.stream().limit(x),您可以调用list.subList(0, Math.min(x, list.size())).stream(),而不是random.ints().limit(x),使用random.ints(x),而不是Stream.generate(generator).limit(x)您可以使用或使用此答案LongStream.range(0, x).mapToObj( index -> generator.get())的工厂方法。

\n\n

对于任意流源/分离器,应用limit并行流可能非常昂贵,甚至有记录trySplit嗯,首先产生副作用就是一个坏主意。

\n