Java Streams:不收集所有值

Val*_*rin 3 java java-8 java-stream

我有一个存储桶列表,每个存储桶包含许多记录.我正在使用流来对每个桶中的这些记录的值求和.但是,在我collect的总和不正确之后,我遇到了一个问题.到目前为止我的处理声明:

List<StatAccumulator> results = statData.stream().map(
            list -> list.stream().parallel()
            .collect(
                () -> new StatAccumulator(metrics, groups),
                StatAccumulator::containerize,
                StatAccumulator::combine
            )
        ).collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

StatAccumulator只是一个容器类,它存储我为每条记录求和的每个值.

public class StatAccumulator {
    public StatRecord result;
    private final List<String> metrics;
    private final List<String> groups;
    private Long count;

    public StatAccumulator(List<String> metrics, List<String> groups) {
        this.metrics = metrics;
        this.groups = groups;
    }

    public void containerize(StatRecord initial) {
        //logger.info(initial.toString());
        this.result = new StatRecord(
            initial.v1,
            initial.v2
        );
        this.count = 1l;
    }

    public void combine(StatAccumulator other) {
        result.v1+= other.result.v1;
        result.v2+= other.result.v2;

        this.count += other.count;
        logger.info("Current Combined: "+this.result.v1.toString());
    }
}
Run Code Online (Sandbox Code Playgroud)

为简单起见,我只使用一个存储桶并仅跟踪一个值.在进入这个处理步骤之前,我输出每个记录的所有值并在Excel中求和以得到预期的结果(~28k),但我倾向于得到~5k的实际结果.所以,我已经确认所有数据都在进行中,但并非所有数据都出来了.有谁知道为什么我会错过结果?

JB *_*zet 5

您的容器化方法不正确.它应该是

public class StatAccumulator {
    public StatRecord result = new StatRecord(0, 0);
    private final List<String> metrics;
    private final List<String> groups;
    private long count;

    public StatAccumulator(List<String> metrics, List<String> groups) {
        this.metrics = metrics;
        this.groups = groups;
    }

    public void containerize(StatRecord other) {
        //logger.info(initial.toString());
        this.result.v1 += other.v1,
        this.result.v2 += other.v2
        this.count++;
    }

    public void combine(StatAccumulator other) {
        result.v1+= other.result.v1;
        result.v2+= other.result.v2;
        this.count += other.count;
        logger.info("Current Combined: "+this.result.v1.toString());
    }
}
Run Code Online (Sandbox Code Playgroud)

containerize用于从初始状态累积结果.它是流顺序时使用的唯一方法.

combine 仅在流并行时使用,以组合两个"子流"的累积结果.

  • ...并且由于`metrics`和`groups`完全未使用,从`StatAccumulator`类中删除它们允许用更简单的`StatAccumulator :: new`替换供应商`() - > new StatAccumulator(metrics,groups)`... (2认同)