Java 8泛型:将消费者流量减少到单个消费者

use*_*860 31 java generics java-8 java-stream

我怎么能写组合的方法StreamConsumers到一个单一的Consumer使用Consumer.andThen(Consumer)

我的第一个版本是:

<T> Consumer<T> combine(Stream<Consumer<T>> consumers) {
    return consumers
            .filter(Objects::nonNull)
            .reduce(Consumer::andThen)
            .orElse(noOpConsumer());
}

<T> Consumer<T> noOpConsumer() {
    return value -> { /* do nothing */ };
}
Run Code Online (Sandbox Code Playgroud)

此版本使用JavaC和Eclipse编译.但它太具体了:Stream不能是a Stream<SpecialConsumer>,如果Consumers不是类型T而是超类型,则不能使用:

Stream<? extends Consumer<? super Foo>> consumers = ... ;
combine(consumers);
Run Code Online (Sandbox Code Playgroud)

这不会合理地编译.改进版将是:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
    return consumers
            .filter(Objects::nonNull)
            .reduce(Consumer::andThen)
            .orElse(noOpConsumer());
}
Run Code Online (Sandbox Code Playgroud)

但Eclipse和JavaC都没有编译:
Eclipse(4.7.3a):

该类型Consumer未定义andThen(capture#7-of ? extends Consumer<? super T>, capture#7-of ? extends Consumer<? super T>)适用于此处

JavaC(1.8.0172):

错误:不兼容的类型:无效的方法引用
.reduce(Consumer::andThen)
不兼容的类型:Consumer<CAP#1>无法转换为Consumer<? super CAP#2>
where T类型变量:
T extends Object在方法<T>combine(Stream<? extends Consumer<? super T>>)
中声明CAP#1,CAP#2是新的类型变量:
CAP#1 extends Object super: T from capture of ? super T
CAP#2 extends Object super: T from capture of ? super T

但它应该有效:消费者的每个子类也可以用作消费者.每个X型超级消费者也可以消费X. 我尝试将类型参数添加到流版本的每一行,但这无济于事.但如果我用传统的循环写下来,它会编译:

<T> Consumer<T> combine(Collection<? extends Consumer<? super T>> consumers) {
    Consumer<T> result = noOpConsumer()
    for (Consumer<? super T> consumer : consumers) {
        result = result.andThen(consumer);
    }
    return result;
}
Run Code Online (Sandbox Code Playgroud)

(为简明起见,省略了过滤掉空值.)

因此,我的问题是:我怎样才能说服JavaC和Eclipse我的代码是正确的?或者,如果不正确:为什么循环版本正确但不是Stream版本?

Ole*_*hov 27

您使用Stream.reduce(accumulator)具有以下签名的单参数版本:

Optional<T> reduce(BinaryOperator<T> accumulator);
Run Code Online (Sandbox Code Playgroud)

BinaryOperator<T> accumulator只能接受类型的元素T,但你必须:

<? extends Consumer<? super T>>
Run Code Online (Sandbox Code Playgroud)

我建议您使用该Stream.reduce(...)方法的三参数版本:

<U> U reduce(U identity,
             BiFunction<U, ? super T, U> accumulator
             BinaryOperator<U> combiner);
Run Code Online (Sandbox Code Playgroud)

BiFunction<U, ? super T, U> accumulator可以接受两种不同类型的参数,有较少限制的束缚,更适合你的情况.可能的解决方案可能是:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {
    return consumers.filter(Objects::nonNull)
                    .reduce(t -> {}, Consumer::andThen, Consumer::andThen);
}
Run Code Online (Sandbox Code Playgroud)

第三个参数BinaryOperator<U> combiner仅在并行流中调用,但无论如何,提供正确的实现是明智的.

此外,为了更好地理解,可以将上述代码表示如下:

<T> Consumer<T> combine(Stream<? extends Consumer<? super T>> consumers) {

    Consumer<T> identity = t -> {};
    BiFunction<Consumer<T>, Consumer<? super T>, Consumer<T>> acc = Consumer::andThen;
    BinaryOperator<Consumer<T>> combiner = Consumer::andThen;

    return consumers.filter(Objects::nonNull)
                    .reduce(identity, acc, combiner);
}
Run Code Online (Sandbox Code Playgroud)

现在你可以写:

Stream<? extends Consumer<? super Foo>> consumers = Stream.of();
combine(consumers);
Run Code Online (Sandbox Code Playgroud)