如何实现Java 8 Stream流畅的API并且懒洋洋地计算

Gas*_*ium 1 collections lambda higher-order-functions java-8 java-stream

我想找出一个简单的实现等同于Java的8 ,让我去探索的查询算法计算懒洋洋的发展(如map(),filter(),reduce(),等).注意:实现比Stream更好的解决方案不是我的目标.另一方面,我唯一的目标是了解Stream内部.

然而,我发现的每个实现都基于Iterable<T>,例如以下答案中提供的解决方案:

然而,我对这些解决方案感到不舒服,因为:

  1. 他们太冗长了.
  2. 它们对于新的查询方法不灵活.包含新的查询方法需要进行结构修改.
  3. 尽管存在查询参数,但它们并没有利用新的Java 8功能,例如:第一类函数或默认方法.
  4. 他们都没有使用上面使用的Spliterator<T>方法Stream<T>.

我知道这Spliterator<T>是为了允许分区和并行处理,但我认为它的独特迭代器方法(boolean tryAdvance(Consumer<t>))可以被利用到新的替代品而不是上面列出的那些.此外,正如Brian Goetz所述:

SpliteratorIterator即使没有并行性,也是更好的.(他们通常也更容易写,更容易出错.)

那么,是否有可能开发出一种更易读,更简单,更简洁,更灵活的查询API实现,并且基于相同的原理Stream<T>(并行处理部分除外)?

如果是的话,你怎么能这样做?我希望看到比上面列出的更简单的实现,如果可能的话,利用新的Java 8功能.

要求:

  • 不要重用Java 8 API中的现有方法
  • 并行处理功能超出了本问题的范围.
  • 如果可能,更好,不要使用该Iterable<T>方法.

我的问题的原因?我认为学习查询API(如Stream)的最佳方法是尝试自己实现这些相同的方法.我在学习时已经成功完成了.net Linq.当然,我没有比Linq更好的实现,但这有助于我理解内部部分.所以,我试图按照相同的方法学习Stream.

这并不是那么不寻常.有以下这种方法对于其他技术,如许多作坊功能的JavaScript车间,其中大部分练习要求对现有方法这样实现:map(),filter(),reduce(),call(),bind(),等...

选择的答案:现在我认为米格尔·甘博亚的答案是我的选择,而不是Tagir Valeev的答案,因为后者不允许的implementaton findAny()findFirst()不通过完全遍历所有元素forEach()dataSrc.但是,我认为Tagir Valeev的答案在一些中间操作的简洁实现以及性能方面还有其他优点,因为forEach()基于这种方法,减少了调解访问数据结构内部的迭代代码的开销,如Brian Goetz所引用的那样.它的答案第2点

Tag*_*eev 5

在没有短路支持的情况下实现无状态操作的子集非常容易.你应该注意始终坚持内部迭代.基本构建块是forEach可以对每个输入元素执行给定动作的操作.forEach方法体是唯一在不同阶段发生变化的事物.所以我们可以用抽象forEach方法创建抽象类,也可以接受实际上是一个体的函数forEach.我会坚持第二种方法:

public final class MyStream<T> {
    private final Consumer<Consumer<T>> action;

    public MyStream(Consumer<Consumer<T>> action) {
        this.action = action;
    }

    public void forEach(Consumer<T> cons) {
        action.accept(cons);
    }
}
Run Code Online (Sandbox Code Playgroud)

现在让我们创建一些简单的资源:

public static <T> MyStream<T> of(Iterable<T> elements) {
    // just redirect to Iterable::forEach
    return new MyStream<>(elements::forEach);
}

@SafeVarargs
public static <T> MyStream<T> of(T... elements) {
    return of(Arrays.asList(elements));
}

public static MyStream<Integer> range(int from, int to) {
    return new MyStream<>(cons -> {
        for(int i=from; i<to; i++) cons.accept(i);
    });
}
Run Code Online (Sandbox Code Playgroud)

现在中间操作.他们只需要调整接收到的消费者action来执行其他操作:

public <U> MyStream<U> map(Function<T, U> mapper) {
    return new MyStream<>(cons -> forEach(e -> cons.accept(mapper.apply(e))));
}

public MyStream<T> filter(Predicate<T> pred) {
    return new MyStream<>(cons -> forEach(e -> {
        if(pred.test(e))
            cons.accept(e);
    }));
}

public <U> MyStream<U> flatMap(Function<T, MyStream<U>> mapper) {
    return new MyStream<>(cons -> forEach(e -> mapper.apply(e).forEach(cons)));
}

public MyStream<T> peek(Consumer<T> action) {
    return new MyStream<>(cons -> forEach(e -> {
        action.accept(e);
        cons.accept(e);
    }));
}

public MyStream<T> skip(long n) {
    return new MyStream<>(cons -> {
        long[] count = {0};
        forEach(e -> {
            if(++count[0] > n)
                cons.accept(e);
        });
    });
}
Run Code Online (Sandbox Code Playgroud)

现在让我们使用forEach以下方法创建一些终端操作:

public T reduce(T identity, BinaryOperator<T> op) {
    class Box {
        T val = identity;
    }
    Box b = new Box();
    forEach(e -> b.val = op.apply(b.val, e));
    return b.val;
}

public Optional<T> reduce(BinaryOperator<T> op) {
    class Box {
        boolean isPresent;
        T val;
    }
    Box b = new Box();
    forEach(e -> {
        if(b.isPresent) b.val = op.apply(b.val, e);
        else {
            b.val = e;
            b.isPresent = true;
        }
    });
    return b.isPresent ? Optional.empty() : Optional.of(b.val);
}

public long count() {
    return map(e -> 1L).reduce(0L, Long::sum);
}

public Optional<T> maxBy(Comparator<T> cmp) {
    return reduce(BinaryOperator.maxBy(cmp));
}

public Optional<T> minBy(Comparator<T> cmp) {
    return reduce(BinaryOperator.minBy(cmp));
}
Run Code Online (Sandbox Code Playgroud)

所以我们现在有了我们的流.我们来试试吧:

System.out.println(MyStream.of(1,2,3,4,5).map(x -> x*2)
                           .reduce(0, Integer::sum));
// 30

System.out.println(MyStream.of("a", "stream", "of", "some", "strings")
                           .flatMap(x -> MyStream.of(", ", x))
                           .skip(1).reduce("", String::concat));
// a, stream, of, some, strings

System.out.println(MyStream.range(0, 100)
                           .filter(x -> x % 3 == 0).count());
// 34
Run Code Online (Sandbox Code Playgroud)

等等.这样的实现非常简单,但与实际Stream API中的内容非常接近.当然,当你添加短路,并行流,原始特化和更多有状态操作时,事情会复杂得多.

请注意,与Stream API不同,这MyStream可以多次重复使用:

MyStream<Integer> range = range(0, 10);
range.forEach(System.out::println);
range.forEach(System.out::println); // works perfectly
Run Code Online (Sandbox Code Playgroud)

  • 是的,它很可能[见要点](https://gist.github.com/amaembo/39b9c354ba3b63e973f3ce2b3ee4a9bf)并且确实简化了示例.然而,这种方式更难以扩展,例如,支持特性或逐个处理(如`tryAdvance`).因此,我不会改变答案,但如果你愿意,你可以采取主旨版本. (3认同)
  • @MiguelGamboa,需要更多更改.最简单的方法是将`forEach`替换为`forEachWithCancel(Predicate)`,如果应该停止处理,它将返回`false`. (3认同)
  • @TagirValeev根据你的建议你如何实现`findFirst()`或`findAny()`?我没有意识到如何做到这一点,而不会导致最内部的`forEach()`完全遍历元素源. (2认同)