Java Streams - 如何在每第n个项目中执行中间函数

The*_*tor 3 java lambda java-8 java-stream

我正在寻找一个Stream上的操作,使我能够每隔n项执行一次非终端(和/或终端)操作.虽然我使用素数流,例如,流可以很容易地生成网络请求,用户操作或其他一些冷数据或实时源.

由此:

    Duration start = Duration.ofNanos(System.nanoTime());

    IntStream.iterate(2, n -> n + 1)
            .filter(Findprimes::isPrime)
            .limit(1_000_1000 * 10)
            .forEach(System.out::println);

    System.out.println("Duration: " + Duration.ofNanos(System.nanoTime()).minus(start));
Run Code Online (Sandbox Code Playgroud)

对于像这样的流函数:

    IntStream.iterate(2, n -> n + 1)
            .filter(Findprimes::isPrime)
            .limit(1_000_1000 * 10)
            .peekEvery(10, System.out::println)
            .forEach( it -> {});
Run Code Online (Sandbox Code Playgroud)

And*_*eas 6

创建一个帮助方法来包装peek()使用者:

public static IntConsumer every(int count, IntConsumer consumer) {
    if (count <= 0)
        throw new IllegalArgumentException("Count must be >1: Got " + count);
    return new IntConsumer() {
        private int i;
        @Override
        public void accept(int value) {
            if (++this.i == count) {
                consumer.accept(value);
                this.i = 0;
            }
        }
    };
}
Run Code Online (Sandbox Code Playgroud)

您现在几乎可以按照自己的意愿使用它:

IntStream.rangeClosed(1, 20)
         .peek(every(5, System.out::println))
         .count();
Run Code Online (Sandbox Code Playgroud)

产量

5
10
15
20
Run Code Online (Sandbox Code Playgroud)

辅助方法可以放在实用程序类中并静态导入,类似于Collectors类只是静态辅助方法.

正如@ user140547在著名的评论,这个代码是不是线程安全的,因此它不能与并行流使用.此外,输出顺序会混乱,所以无论如何将它与并行流一起使用并不是真的有意义.

  • 如果使用并行流,最终可能会输出"15 20 10 5"之类的输出.此外,此实现不是线程安全的,并且在使用并行流时应该同步,因此您可能得到任何结果. (3认同)
  • @ user140547:为了正确,如果你使用并行流,有很多不同的可能输出,显示*任意*数字而不是不同顺序的所需数字,甚至使方法"同步"将无法修复那.这个消费者依赖于*processing*订单,而期望的结果取决于*遇到*订单.此外,`peek`旨在帮助调试处理,而不是执行所需的操作,即在Java 9下,此代码将不执行任何操作,因为`rangeClosed`的`count`可以在不处理任何项目的情况下计算... (2认同)

use*_*547 5

依赖并不是一个好主意peek(),count()因为如果可以在不经过整个流的情况下计算,则根本不可以调用该操作count().即使它现在有效,但这并不意味着它将来也会起作用.请参阅Java 9中的javadocStream.count().

更好用forEach().

对于问题本身:在像简单迭代这样的特殊情况下,你可以像过滤一样对象.

Stream.iterate(2, n->n+1)
        .limit(20)
        .filter(n->(n-2)%5==0 && n!=2)
        .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

这当然不适用于您可能使用有状态的其他情况IntConsumer.如果iterate()使用,无论如何使用并行流可能没那么有用.

如果你想要一个通用的解决方案,你也可以尝试使用"正常" Stream,这可能不如a IntStream,但在许多情况下仍然应该足够:

class Tuple{ // ctor, getter/setter omitted
    int index;
    int value;
}
Run Code Online (Sandbox Code Playgroud)

然后你可以这样做:

Stream.iterate( new Tuple(1,2),t-> new Tuple(t.index+1,t.value*2))
        .limit(30)
        .filter(t->t.index %5 == 0)
        .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)

如果你必须使用peek(),你也可以

.peek(t->{if (t.index %5 == 0) System.out.println(t);})

或者如果你添加方法

static Tuple initialTuple(int value){
   return new Tuple(1,value);
}

static UnaryOperator<Tuple> createNextTuple(IntUnaryOperator f){
    return current -> new Tuple(current.index+1,f.applyAsInt(current.value));
}
static Consumer<Tuple> every(int n,IntConsumer consumer){
    return tuple -> {if (tuple.index % n == 0) consumer.accept(tuple.value);};
}
Run Code Online (Sandbox Code Playgroud)

你也可以(使用静态导入):

  Stream.iterate( initialTuple(2), createNextTuple(x->x*2))
        .limit(30)
        .peek(every(5,System.out::println))
        .forEach(System.out::println);
Run Code Online (Sandbox Code Playgroud)