流状态计算:累积总和

Adr*_*ard 21 java java-8 java-stream

假设我有一个Java IntStream,是否可以将其转换为具有累积总和的IntStream?例如,以[4,2,6,...]开头的流应转换为[4,6,12,...].

更一般地说,应该如何实现有状态流操作?感觉这应该是可能的:

myIntStream.map(new Function<Integer, Integer> {
    int sum = 0; 
    Integer apply(Integer value){ 
        return sum += value; 
    }
);
Run Code Online (Sandbox Code Playgroud)

有明显的限制,这只适用于顺序流.但是,Stream.map明确需要无状态映射函数.我是否正确错过了Stream.statefulMap或Stream.cumulative操作,还是缺少Java流的重点?

比较一下Haskell,其中scanl1函数正好解决了这个例子:

scanl1 (+) [1 2 3 4] = [1 3 6 10]
Run Code Online (Sandbox Code Playgroud)

Ste*_*e K 9

你可以用原子序数做到这一点.例如:

import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

public class Accumulator {
    public static LongStream toCumulativeSumStream(IntStream ints){
        AtomicLong sum = new AtomicLong(0);
        return ints.sequential().mapToLong(sum::addAndGet);
    }

    public static void main(String[] args){
        LongStream sums = Accumulator.toCumulativeSumStream(IntStream.range(1, 5));
        sums.forEachOrdered(System.out::println);
    }
}
Run Code Online (Sandbox Code Playgroud)

这输出:

1
3
6
10
Run Code Online (Sandbox Code Playgroud)

我使用Long来存储总和,因为完全有可能两个整数加起来Integer.MAX_VALUE很长,而且很长一段时间没有溢出的可能性.

  • 我觉得这个答案很有意思.你能回答几个问题吗?为什么使用AtomicReference而不是AtomicInteger - 使用addAndGet?但更重要的是,为什么这会改变这样一个事实,即如果流并行,则无法保证累积发生的顺序?AtomicReference会以某种方式改变流的行为吗?如果是这样,你能指出一个关于它的教程或文档吗?谢谢. (2认同)
  • 只是前一个问题的补充,我想我会尝试使用并行的IntStream.它不起作用.所以这根本不是一个好的答案. (2认同)
  • @sprinter是的,这对并行流不起作用,但由于操作无论如何都无法并行化,因此可以在运行之前调用.sequential(). (2认同)

spr*_*ter 5

可以使用收集器然后创建新流:

class Accumulator {
    public static void accept(List<Integer> list, Integer value) {
        list.add(value + (list.isEmpty() ? 0 : list.get(list.size() - 1)));
    }

    public static List<Integer> combine(List<Integer> list1, List<Integer> list2) {
        int total = list1.get(list1.size() - 1);
        list2.stream().map(n -> n + total).forEach(list1::add);
        return list1;
    }
}
Run Code Online (Sandbox Code Playgroud)

这用作:

myIntStream.parallel()
    .collect(ArrayList<Integer>::new, Accumulator::accept, Accumulator::combine)
    .stream();
Run Code Online (Sandbox Code Playgroud)

希望您可以看到此收集器的重要属性是即使流是并行的,因为Accumulator实例组合在一起它会调整总计.

这显然不如映射操作有效,因为它收集整个流然后生成新流.但这不仅仅是一个实现细节:它是流可能同时处理的事实的必要功能.

我已经测试过它IntStream.range(0, 10000).parallel()并且它正常运行.

  • 这是一个内存流源(如数组或arraylist)如何被拆分(在中间)的工件.如果你使用`Files.lines()`或任何其他基于IO的源,你会看到不同的大小.不平衡的树木也会产生不均匀的分割尺寸. (2认同)