如何在Stream上短路reduce()操作?

bow*_*ore 29 java java-8 java-stream

这与如何在Stream上短路减少基本相同.但是,由于这个问题集中在一个布尔值流,并且其答案不能推广到其他类型并减少操作,我想问更一般的问题.

我们如何在流上进行减少,以便在遇到减少操作的吸收元件时发生短路?

乘法的典型数学例子是0.这个Stream:

int product = IntStream.of(2, 3, 4, 5, 0, 7, 8)
        .reduce(1, (a, b) -> a * b);
Run Code Online (Sandbox Code Playgroud)

将消耗最后两个元素(78),无论一旦0遇到产品已知的事实.

Tag*_*eev 11

遗憾的是,Stream API具有创建自己的短路操作的有限能力.不那么干净的解决方案就是扔掉它RuntimeException并抓住它.这是实现IntStream,但它也可以推广到其他流类型:

public static int reduceWithCancelEx(IntStream stream, int identity, 
                      IntBinaryOperator combiner, IntPredicate cancelCondition) {
    class CancelException extends RuntimeException {
        private final int val;

        CancelException(int val) {
            this.val = val;
        }
    }

    try {
        return stream.reduce(identity, (a, b) -> {
            int res = combiner.applyAsInt(a, b);
            if(cancelCondition.test(res))
                throw new CancelException(res);
            return res;
        });
    } catch (CancelException e) {
        return e.val;
    }
}
Run Code Online (Sandbox Code Playgroud)

用法示例:

int product = reduceWithCancelEx(
        IntStream.of(2, 3, 4, 5, 0, 7, 8).peek(System.out::println), 
        1, (a, b) -> a * b, val -> val == 0);
System.out.println("Result: "+product);
Run Code Online (Sandbox Code Playgroud)

输出:

2
3
4
5
0
Result: 0
Run Code Online (Sandbox Code Playgroud)

请注意,即使它适用于并行流,也不能保证只要其中一个引发异常,其他并行任务就会完成.已经启动的子任务可能会一直运行到完成,因此您可以处理比预期更多的元素.

更新:替代解决方案,更长,但更平行友好.它基于自定义分裂器,它最多返回一个元素,这是所有底层元素积累的结果).在顺序模式下使用它时,它会在单个tryAdvance调用中完成所有工作.拆分时,每个部件都会生成对应的单个部分结果,这些部分结果由Stream引擎使用组合器函数减少.这是通用版本,但原始专业化也是可能的.

final static class CancellableReduceSpliterator<T, A> implements Spliterator<A>,
        Consumer<T>, Cloneable {
    private Spliterator<T> source;
    private final BiFunction<A, ? super T, A> accumulator;
    private final Predicate<A> cancelPredicate;
    private final AtomicBoolean cancelled = new AtomicBoolean();
    private A acc;

    CancellableReduceSpliterator(Spliterator<T> source, A identity,
            BiFunction<A, ? super T, A> accumulator, Predicate<A> cancelPredicate) {
        this.source = source;
        this.acc = identity;
        this.accumulator = accumulator;
        this.cancelPredicate = cancelPredicate;
    }

    @Override
    public boolean tryAdvance(Consumer<? super A> action) {
        if (source == null || cancelled.get()) {
            source = null;
            return false;
        }
        while (!cancelled.get() && source.tryAdvance(this)) {
            if (cancelPredicate.test(acc)) {
                cancelled.set(true);
                break;
            }
        }
        source = null;
        action.accept(acc);
        return true;
    }

    @Override
    public void forEachRemaining(Consumer<? super A> action) {
        tryAdvance(action);
    }

    @Override
    public Spliterator<A> trySplit() {
        if(source == null || cancelled.get()) {
            source = null;
            return null;
        }
        Spliterator<T> prefix = source.trySplit();
        if (prefix == null)
            return null;
        try {
            @SuppressWarnings("unchecked")
            CancellableReduceSpliterator<T, A> result = 
                (CancellableReduceSpliterator<T, A>) this.clone();
            result.source = prefix;
            return result;
        } catch (CloneNotSupportedException e) {
            throw new InternalError();
        }
    }

    @Override
    public long estimateSize() {
        // let's pretend we have the same number of elements
        // as the source, so the pipeline engine parallelize it in the same way
        return source == null ? 0 : source.estimateSize();
    }

    @Override
    public int characteristics() {
        return source == null ? SIZED : source.characteristics() & ORDERED;
    }

    @Override
    public void accept(T t) {
        this.acc = accumulator.apply(this.acc, t);
    }
}
Run Code Online (Sandbox Code Playgroud)

其方法类似于Stream.reduce(identity, accumulator, combiner)Stream.reduce(identity, combiner),但是cancelPredicate:

public static <T, U> U reduceWithCancel(Stream<T> stream, U identity,
        BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner,
        Predicate<U> cancelPredicate) {
    return StreamSupport
            .stream(new CancellableReduceSpliterator<>(stream.spliterator(), identity,
                    accumulator, cancelPredicate), stream.isParallel()).reduce(combiner)
            .orElse(identity);
}

public static <T> T reduceWithCancel(Stream<T> stream, T identity,
        BinaryOperator<T> combiner, Predicate<T> cancelPredicate) {
    return reduceWithCancel(stream, identity, combiner, combiner, cancelPredicate);
}
Run Code Online (Sandbox Code Playgroud)

让我们测试两个版本并计算实际处理的元素数量.让我们0接近尾声.例外版本:

AtomicInteger count = new AtomicInteger();
int product = reduceWithCancelEx(
        IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
                .parallel().peek(i -> count.incrementAndGet()), 1,
        (a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);
Run Code Online (Sandbox Code Playgroud)

典型输出:

product: 0/count: 281721
product: 0/count: 500001
Run Code Online (Sandbox Code Playgroud)

因此,当仅处理某些元素时返回结果时,任务继续在后台工作,并且计数器仍在增加.这是分裂器版本:

AtomicInteger count = new AtomicInteger();
int product = reduceWithCancel(
        IntStream.range(-1000000, 100).filter(x -> x == 0 || x % 2 != 0)
                .parallel().peek(i -> count.incrementAndGet()).boxed(), 
                1, (a, b) -> a * b, x -> x == 0);
System.out.println("product: " + product + "/count: " + count);
Thread.sleep(1000);
System.out.println("product: " + product + "/count: " + count);
Run Code Online (Sandbox Code Playgroud)

典型输出:

product: 0/count: 281353
product: 0/count: 281353
Run Code Online (Sandbox Code Playgroud)

返回结果时,所有任务实际上都已完成.


Lii*_*Lii 5

可以使用流的拆分器来实现通用的短路静态归约方法。竟然竟然不是很复杂!当人们想要以更灵活的方式使用蒸汽时,使用分离器似乎是很多时候的方法。

public static <T> T reduceWithCancel(Stream<T> s, T acc, BinaryOperator<T> op, Predicate<? super T> cancelPred) {
    BoxConsumer<T> box = new BoxConsumer<T>();
    Spliterator<T> splitr = s.spliterator();

    while (!cancelPred.test(acc) && splitr.tryAdvance(box)) {
        acc = op.apply(acc, box.value);
    }

    return acc;
}

public static class BoxConsumer<T> implements Consumer<T> {
    T value = null;
    public void accept(T t) {
        value = t;
    }
}
Run Code Online (Sandbox Code Playgroud)

用法:

    int product = reduceWithCancel(
        Stream.of(1, 2, 0, 3, 4).peek(System.out::println),
        1, (acc, i) -> acc * i, i -> i == 0);

    System.out.println("Result: " + product);
Run Code Online (Sandbox Code Playgroud)

输出:

1
2
0
Result: 0
Run Code Online (Sandbox Code Playgroud)

该方法可以推广到执行其他类型的终端操作。

这是基于这个关于 take-while 操作的答案

我对它的并行化潜力一无所知。

  • 请注意,在我的解决方案中,`cancelPredicate` 测试了归约的结果,而不是下一个元素。在这种情况下,它实际上更好(例如,Java 中的 `65536*65536 == 0`,尽管两个参数都不为零)。您的答案可以很容易地适应做同样的事情。我有一个基于拆分器的想法,它是并行友好的,但需要一些时间来正确编码...... (2认同)