通过谓词限制流

MFo*_*ter 179 java java-8 java-stream

是否有Java 8流操作限制(可能是无限的)Stream直到第一个元素无法匹配谓词?

在Java 9中,我们可以使用takeWhile下面的示例来打印小于10的所有数字.

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

由于Java 8中没有这样的操作,以一般方式实现它的最佳方法是什么?

Stu*_*rks 143

操作takeWhiledropWhile已添加到JDK 9.您的示例代码

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

在JDK 9下编译和运行时,它的行为与您期望的完全相同.

JDK 9已经发布.它可以在这里下载:http://jdk.java.net/9/

  • @LukasEder`takeWhile`和`dropWhile`非常普遍,发生在Scala,Python,Groovy,Ruby,Haskell和Clojure中."skip"和"limit"的不对称是不幸的.也许`skip`和`limit`应该被称为`drop`和`take`,但除非你已经熟悉Haskell,否则这些并不是那么直观. (9认同)
  • 使用`takeWhile` /`dropWhile`直接链接到JDK9 Stream的预览文档:http://download.java.net/jdk9/docs/api/java/util/stream/Stream.html (3认同)
  • @StuartMarks:我知道`dropXXX`和`takeXXX`是比较流行的术语,但我个人可以使用更多SQL-esque`limitXXX`和`skipXXX`.我发现这种新的不对称性比单独选择的术语更令人困惑...... :)(顺便说一下:Scala也有`drop(int)`和`take(int)`) (3认同)
  • 是的,让我在生产环境中升级到 Jdk 9。许多开发人员仍在使用 Jdk8,这样的功能应该从一开始就包含在 Streams 中。 (3认同)
  • 为了与现有 API 保持一致,是否有任何理由将它们称为“takeWhile”和“dropWhile”而不是“limitWhile”和“skipWhile”? (2认同)
  • `IntStream .iterate(1, n -&gt; n + 1) .takeWhile(n -&gt; n &lt; 10)` 可以简化为 `IntStream .iterate(1, n -&gt; n &lt; 10, n -&gt; n + 1) ` (2认同)
  • @ArchimedesTrajano 在这种情况下可以理解为 `iterate(1, n -&gt; n &lt; 10, n -&gt; n + 1)` 直接对应于 `for(int n = 1; n &lt; 10; n = n + 1)` (2认同)

Lou*_*man 80

使用Java 8 应该可以实现这样的操作Stream,但它不一定能够有效地完成 - 例如,您不一定能够并行化这样的操作,因为您必须按顺序查看元素.

API没有提供一种简单的方法,但最简单的方法是采取Stream.iterator(),包装Iterator有一个"需要时间"的实现,然后回到a Spliterator然后a Stream.或者 - 也许 - 包装Spliterator,虽然在这个实现中它不能再被拆分了.

下面是一个未经测试的执行takeWhileSpliterator:

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) {
  return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) {
    boolean stillGoing = true;
    @Override public boolean tryAdvance(Consumer<? super T> consumer) {
      if (stillGoing) {
        boolean hadNext = splitr.tryAdvance(elem -> {
          if (predicate.test(elem)) {
            consumer.accept(elem);
          } else {
            stillGoing = false;
          }
        });
        return hadNext && stillGoing;
      }
      return false;
    }
  };
}

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
   return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);
}
Run Code Online (Sandbox Code Playgroud)

  • 如果他们没有那么专注于自动化并行性,那么Streams会好得多.只需很少一部分可以使用Streams的地方就需要并行性.此外,如果Oracle非常关注perfoance,他们本可以使JVM JIT自动向量化,并且在不打扰开发人员的情况下获得更大的性能提升.现在这就是自动化并行化. (87认同)
  • 理论上,将takeWhile与无状态谓词并行化很容易.评估并行批处理中的条件(假设谓词如果多次执行则不会抛出或产生副作用).问题是在Streams使用的递归分解(fork/join框架)的上下文中进行.真的,它的Streams非常低效. (7认同)
  • 不,@ Radiodef.该问题专门针对Java 8解决方案. (3认同)

小智 49

allMatch()是一种短路功能,因此您可以使用它来停止处理.主要的缺点是你必须做两次测试:一次是看你是否应该处理它,再看看是否继续进行.

IntStream
    .iterate(1, n -> n + 1)
    .peek(n->{if (n<10) System.out.println(n);})
    .allMatch(n->n < 10);
Run Code Online (Sandbox Code Playgroud)

  • 此解决方案的缺点是它返回一个布尔值,因此您无法像往常一样收集流的结果. (9认同)
  • 这对我来说似乎不直观(给定方法名称),但[docs确认](https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#allMatch -java.util.function.Predicate-)`Stream.allMatch()`是[短路操作](https://docs.oracle.com/javase/8/docs/api/java/util/stream /package-summary.html#StreamOps).所以这甚至会在像IntStream.iterate()`这样的无限流上完成.当然,回想起来,这是一个明智的优化. (4认同)
  • 这很整洁,但我不认为它的意图很好,它的意图是"偷看"的主体.如果我下个月遇到它,我会花一点时间想知道为什么我之前的程序员检查了`allMatch`然后忽略了答案. (3认同)

Tag*_*eev 34

作为@StuartMarks的后续回答.我的StreamEx库具有takeWhile与当前JDK-9实现兼容的操作.在JDK-9下运行时,它将委托给JDK实现(通过MethodHandle.invokeExact它实现非常快).在JDK-8下运行时,将使用"polyfill"实现.所以使用我的库可以解决这个问题:

IntStreamEx.iterate(1, n -> n + 1)
           .takeWhile(n -> n < 10)
           .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)


Dom*_*Fox 13

takeWhileprotonpack库提供的功能之一.

Stream<Integer> infiniteInts = Stream.iterate(0, i -> i + 1);
Stream<Integer> finiteInts = StreamUtils.takeWhile(infiniteInts, i -> i < 10);

assertThat(finiteInts.collect(Collectors.toList()),
           hasSize(10));
Run Code Online (Sandbox Code Playgroud)


The*_*tor 9

更新:Java 9 Stream现在带有一个takeWhile方法.

无需黑客或其他解决方案.只是用它!


我相信这可以在以下方面得到很大改善:(有人可能会让它成为线程安全的)

Stream<Integer> stream = Stream.iterate(0, n -> n + 1);

TakeWhile.stream(stream, n -> n < 10000)
         .forEach(n -> System.out.print((n == 0 ? "" + n : "," + n)));
Run Code Online (Sandbox Code Playgroud)

一个黑客肯定...不优雅 - 但它的作用〜:D

class TakeWhile<T> implements Iterator<T> {

    private final Iterator<T> iterator;
    private final Predicate<T> predicate;
    private volatile T next;
    private volatile boolean keepGoing = true;

    public TakeWhile(Stream<T> s, Predicate<T> p) {
        this.iterator = s.iterator();
        this.predicate = p;
    }

    @Override
    public boolean hasNext() {
        if (!keepGoing) {
            return false;
        }
        if (next != null) {
            return true;
        }
        if (iterator.hasNext()) {
            next = iterator.next();
            keepGoing = predicate.test(next);
            if (!keepGoing) {
                next = null;
            }
        }
        return next != null;
    }

    @Override
    public T next() {
        if (next == null) {
            if (!hasNext()) {
                throw new NoSuchElementException("Sorry. Nothing for you.");
            }
        }
        T temp = next;
        next = null;
        return temp;
    }

    public static <T> Stream<T> stream(Stream<T> s, Predicate<T> p) {
        TakeWhile tw = new TakeWhile(s, p);
        Spliterator split = Spliterators.spliterator(tw, Integer.MAX_VALUE, Spliterator.ORDERED);
        return StreamSupport.stream(split, false);
    }

}
Run Code Online (Sandbox Code Playgroud)


frh*_*ack 8

您可以使用java8 + rxjava.

import java.util.stream.IntStream;
import rx.Observable;


// Example 1)
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n ->
          {
                System.out.println(n);
                return n < 10;
          }
    ).subscribe() ;


// Example 2
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n -> n < 10)
    .forEach( n -> System.out.println(n));
Run Code Online (Sandbox Code Playgroud)


gil*_*des 5

实际上,在Java 8中有两种方法可以实现此目的,而无需任何额外的库或使用Java 9。

如果要在控制台上打印2到20之间的数字,可以执行以下操作:

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).allMatch(i -> i < 20);
Run Code Online (Sandbox Code Playgroud)

要么

IntStream.iterate(2, (i) -> i + 2).peek(System.out::println).anyMatch(i -> i >= 20);
Run Code Online (Sandbox Code Playgroud)

在两种情况下,输出均为:

2
4
6
8
10
12
14
16
18
20
Run Code Online (Sandbox Code Playgroud)

尚无人提及任何比赛。这就是这篇文章的原因。


mar*_*ian 5

这是从JDK 9 java.util.stream.Stream.takeWhile(Predicate)复制的源代码.与JDK 8一起使用有点不同.

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) {
    class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> {
        private static final int CANCEL_CHECK_COUNT = 63;
        private final Spliterator<T> s;
        private int count;
        private T t;
        private final AtomicBoolean cancel = new AtomicBoolean();
        private boolean takeOrDrop = true;

        Taking(Spliterator<T> s) {
            super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED));
            this.s = s;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            boolean test = true;
            if (takeOrDrop &&               // If can take
                    (count != 0 || !cancel.get()) && // and if not cancelled
                    s.tryAdvance(this) &&   // and if advanced one element
                    (test = p.test(t))) {   // and test on element passes
                action.accept(t);           // then accept element
                return true;
            } else {
                // Taking is finished
                takeOrDrop = false;
                // Cancel all further traversal and splitting operations
                // only if test of element failed (short-circuited)
                if (!test)
                    cancel.set(true);
                return false;
            }
        }

        @Override
        public Comparator<? super T> getComparator() {
            return s.getComparator();
        }

        @Override
        public void accept(T t) {
            count = (count + 1) & CANCEL_CHECK_COUNT;
            this.t = t;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }
    }
    return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close);
}
Run Code Online (Sandbox Code Playgroud)