Java 8流组合器从未调用过

Mar*_*nov 23 java java-8 java-stream collectors

我正在编写一个自定义java 8收集器,它应该计算具有getValue()方法的POJO的平均值.这是代码:

public static Collector<BoltAggregationData, BigDecimal[], BigDecimal> avgCollector = new Collector<BoltAggregationData, BigDecimal[], BigDecimal>() {

        @Override
        public Supplier<BigDecimal[]> supplier() {
            return () -> {
                BigDecimal[] start = new BigDecimal[2];
                start[0] = BigDecimal.ZERO;
                start[1] = BigDecimal.ZERO;
                return start;
            };
        }

        @Override
        public BiConsumer<BigDecimal[], BoltAggregationData> accumulator() {
            return (a,b) ->  {
                a[0] = a[0].add(b.getValue());
                a[1] = a[1].add(BigDecimal.ONE);
            };
        }

        @Override
        public BinaryOperator<BigDecimal[]> combiner() {
            return (a,b) -> {
                a[0] = a[0].add(b[0]);
                a[1] = a[1].add(b[1]);
                return a;
            };
        }

        @Override
        public Function<BigDecimal[], BigDecimal> finisher() {
            return (a) -> {
                return a[0].divide(a[1], 6 , RoundingMode.HALF_UP);
            };
        }

        private final Set<Characteristics> CHARACTERISTICS = new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));

        @Override
        public Set<Characteristics> characteristics() {
            return CHARACTERISTICS;
        }

    };
Run Code Online (Sandbox Code Playgroud)

这一切在非平行情况下都很有效.但是,当我使用a时parallelStream(),它有时不起作用.例如,给定从1到10的值,它计算(53/9而不是55/10).调试调试器时,永远不会遇到combiner()函数中的断点.我需要设置某种旗帜吗?

Jor*_*nee 23

看起来问题就是这个问题CONCURRENT,除了你认为可能之外还有其他的东西:

表示此收集器是并发的,这意味着结果容器可以支持与来自多个线程的相同结果容器同时调用的累加器函数.

不是调用组合器,而是同时调用累加器,BigDecimal[] a对所有线程使用相同的.访问权限a不是原子的,所以它出错了:

Thread1 -> retrieves value of a[0]: 3
Thread2 -> retrieves value of a[0]: 3
Thread1 -> adds own value: 3 + 3 = 6
Thread2 -> adds own value: 3 + 4 = 7
Thread1 -> writes 6 to a[0]
Thread2 -> writes 7 to a[0]
Run Code Online (Sandbox Code Playgroud)

a[0]当它应该是10时,使值为7.同样的事情可能发生a[1],因此结果可能不一致.


如果删除该CONCURRENT特征,则将使用组合器.

  • @dcsohl在测试过程中,我发现在```synchronized(this){...}```周围的累加器中的行也解决了这个问题.但是我的直觉说不应该强制使用这个特性,而是在结果容器以任何方式支持并发操作时使用. (2认同)

Hol*_*ger 18

那么,这正是您在指定时所要求的Characteristics.CONCURRENT:

表示此收集器是并发的,这意味着结果容器可以支持与来自多个线程的相同结果容器同时调用的累加器函数.

如果不是这种情况,就像你一样Collector,你不应该指定那个标志.


作为旁注,new HashSet<Characteristics>(Arrays.asList(Characteristics.CONCURRENT, Characteristics.UNORDERED));指定特征的效率非常低.你可以使用EnumSet.of(Characteristics.CONCURRENT, Characteristics.UNORDERED).当你删除错误的并发特征时,你可以使用EnumSet.of(Characteristics.UNORDERED)或者Collections.singleton(Characteristics.UNORDERED),但HashSet绝对是过度的.