我如何懒洋洋地连接流?

Jef*_*oom 7 java java-8 java-stream

我正在尝试实现一个在其实现中使用自身的另一个实例的流.该流有一些常量元素(使用IntStream.concat),因此只要连接流懒惰地创建非常量部分,这应该有效.我认为使用StreamSupport.intStream重载使用IntStream.concat的供应商("创建一个延迟连接的流")应该足够懒,只能在需要元素时创建第二个分裂器,但是甚至创建流(不评估)它)溢出堆栈.我如何懒洋洋地连接流?


我正试图将这个答案的流媒体素数筛选器移植到Java中.此筛使用其自身的另一个实例(ps = postponed_sieve()在Python代码中).如果我将最初的四个常量元素(yield 2; yield 3; yield 5; yield 7;)分解为它们自己的流,那么很容易将生成器实现为一个分裂器:

/**
 * based on https://stackoverflow.com/a/10733621/3614835
 */
static class PrimeSpliterator extends Spliterators.AbstractIntSpliterator {
    private static final int CHARACTERISTICS = Spliterator.DISTINCT | Spliterator.IMMUTABLE | Spliterator.NONNULL | Spliterator.ORDERED | Spliterator.SORTED;
    private final Map<Integer, Supplier<IntStream>> sieve = new HashMap<>();
    private final PrimitiveIterator.OfInt postponedSieve = primes().iterator();
    private int p, q, c = 9;
    private Supplier<IntStream> s;
    PrimeSpliterator() {
        super(105097564 /* according to Wolfram Alpha */ - 4 /* in prefix */,
                CHARACTERISTICS);
        //p = next(ps) and next(ps) (that's Pythonic?)
        postponedSieve.nextInt();
        this.p = postponedSieve.nextInt();
        this.q = p*p;
    }

    @Override
    public boolean tryAdvance(IntConsumer action) {
        for (; c > 0 /* overflow */; c += 2) {
            Supplier<IntStream> maybeS = sieve.remove(c);
            if (maybeS != null)
                s = maybeS;
            else if (c < q) {
                action.accept(c);
                return true; //continue
            } else {
                s = () -> IntStream.iterate(q+2*p, x -> x + 2*p);
                p = postponedSieve.nextInt();
                q = p*p;
            }
            int m = s.get().filter(x -> !sieve.containsKey(x)).findFirst().getAsInt();
            sieve.put(m, s);
        }
        return false;
    }
}
Run Code Online (Sandbox Code Playgroud)

我对primes()方法的第一次尝试返回一个IntStream连接一个常量流和一个新的PrimeSpliterator:

public static IntStream primes() {
    return IntStream.concat(IntStream.of(2, 3, 5, 7),
            StreamSupport.intStream(new PrimeSpliterator()));
}
Run Code Online (Sandbox Code Playgroud)

调用primes()会导致StackOverflowError,因为primes()始终实例化PrimeSpliterator,但PrimeSpliterator的字段初始值设定项始终调用primes().但是,StreamSupport.intStream超载了一个供应商,这应该允许懒洋洋地创建PrimeSpliterator:

public static IntStream primes() {
    return IntStream.concat(IntStream.of(2, 3, 5, 7),
            StreamSupport.intStream(PrimeSpliterator::new, PrimeSpliterator.CHARACTERISTICS, false));
}
Run Code Online (Sandbox Code Playgroud)

但是,我得到一个具有不同回溯的StackOverflowError(修剪,重复).请注意,递归完全在对primes()的调用中 - 永远不会在返回的流上调用终结操作iterator().

Exception in thread "main" java.lang.StackOverflowError
    at java.util.stream.StreamSpliterators$DelegatingSpliterator$OfInt.<init>(StreamSpliterators.java:582)
    at java.util.stream.IntPipeline.lazySpliterator(IntPipeline.java:155)
    at java.util.stream.IntPipeline$Head.lazySpliterator(IntPipeline.java:514)
    at java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:352)
    at java.util.stream.IntPipeline.spliterator(IntPipeline.java:181)
    at java.util.stream.IntStream.concat(IntStream.java:851)
    at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)
    at com.jeffreybosboom.projecteuler.util.Primes$PrimeSpliterator.<init>(Primes.java:32)
    at com.jeffreybosboom.projecteuler.util.Primes$$Lambda$1/834600351.get(Unknown Source)
    at java.util.stream.StreamSpliterators$DelegatingSpliterator.get(StreamSpliterators.java:513)
    at java.util.stream.StreamSpliterators$DelegatingSpliterator.estimateSize(StreamSpliterators.java:536)
    at java.util.stream.Streams$ConcatSpliterator.<init>(Streams.java:713)
    at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:789)
    at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:785)
    at java.util.stream.Streams$ConcatSpliterator$OfInt.<init>(Streams.java:819)
    at java.util.stream.IntStream.concat(IntStream.java:851)
    at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)
    at com.jeffreybosboom.projecteuler.util.Primes$PrimeSpliterator.<init>(Primes.java:32)
    at com.jeffreybosboom.projecteuler.util.Primes$$Lambda$1/834600351.get(Unknown Source)
    at java.util.stream.StreamSpliterators$DelegatingSpliterator.get(StreamSpliterators.java:513)
    at java.util.stream.StreamSpliterators$DelegatingSpliterator.estimateSize(StreamSpliterators.java:536)
    at java.util.stream.Streams$ConcatSpliterator.<init>(Streams.java:713)
    at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:789)
    at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:785)
    at java.util.stream.Streams$ConcatSpliterator$OfInt.<init>(Streams.java:819)
    at java.util.stream.IntStream.concat(IntStream.java:851)
    at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)
Run Code Online (Sandbox Code Playgroud)

如何懒惰地连接流以允许流在其实现中使用其自身的另一个副本?

Mar*_*nik 9

您显然认为Streams API将其懒惰的保证扩展到了分裂器的实例化; 这是不正确的.它希望能够在实际消费开始之前的任何时间实例化流的分裂器,例如只是为了找出流的特征和报告的大小.消费只有通过调用开始trySplit,tryAdvanceforEachRemaining.

考虑到这一点,您将比您需要的更早地初始化推迟的筛子.在else if部分输入之前,你不能使用它的任何结果tryAdvance.因此,将代码移动到最后可能的时刻,以提供正确性:

@Override
public boolean tryAdvance(IntConsumer action) {
    for (; c > 0 /* overflow */; c += 2) {
        Supplier<IntStream> maybeS = sieve.remove(c);
        if (maybeS != null)
            s = maybeS;
        else {
            if (postponedSieve == null) {
              postponedSieve = primes().iterator();
              postponedSieve.nextInt();
              this.p = postponedSieve.nextInt();
              this.q = p*p;
            }
            if (c < q) {
              action.accept(c);
              return true; //continue
Run Code Online (Sandbox Code Playgroud)

我认为,通过这一改变,即使你的第一次尝试也primes()应该有效.

如果你想继续使用当前的方法,你可能会涉及以下习语:

Stream.<Supplier<IntStream>>of(
  ()->IntStream.of(2, 3, 5, 7),
  ()->intStream(new PrimeSpliterator()))
.flatMap(Supplier::get);
Run Code Online (Sandbox Code Playgroud)

你可能会发现这给你带来了你需要的懒惰.

  • @Marko Topolnik:嗯,每个人都试图找到素数,但不使用简单的循环和'Bit`s`用于`int`s或`BigInteger.nextProbablePrime`来获取更大的值但是`Stream` API,似乎想要在函数式编程中进行练习而不是找到一个简单的实用解决方案.在这方面,"Stream"API通过将这些障碍完全投入到程序员寻求的方式中来做正确的事. (3认同)
  • @TagirValeev平面映射流的供应商流应该是一个解决方案(如我编辑的例子).传播报告的大小更加困难,因为它是鸡和蛋的问题:在实例化分裂器之前,您无法找到报告的大小. (2认同)