从Stream计算元素但仅考虑N来收集

Whi*_*cal 7 java lambda java-stream

以下lambda在Java中是否可能以某种方式存在?我想从我的过滤流中计算元素,但是并行存储前10个

stream().filter(myFilter)  //Reduces input to forthcoming operations
        .limit(10)         //Limits to ten the amount of elements to finish stream 
        .peek(myList::add) //Stores the ten elements into a list
        .count();          //Here is the difficult one. Id like to count everything  the total of elements that pass the filter, beyond the 10 I am fetching
Run Code Online (Sandbox Code Playgroud)

编辑:这对我来说太隐晦了,但这个想法当然是一个最快的潜在解决方案(比调用两次流生成器更快,至少分别执行两个操作):

List<Entity> entities = stream().filter(myFilter) 
                                .limit(10)
                                .collect(Collectors.toList());
long entitiesCount = stream().filter(myFilter) 
                             .count();
Run Code Online (Sandbox Code Playgroud)

...获得单次迭代的利润,而无需在内存中加载整个集合.我正在进行并行化答案的测试

Eug*_*ene 5

定制收集器是这里的答案:

Entry<List<Integer>, Integer> result = list.stream()
            .collect(Collector.of(
                    () -> new SimpleEntry<>(new ArrayList<>(), 0),
                    (l, x) -> {
                        if (l.getKey().size() < 10) {
                            l.getKey().add(x);
                        }
                        l.setValue(l.getValue() + 1);
                    },
                    (left, right) -> {
                        List<Integer> leftList = left.getKey();
                        List<Integer> rightList = right.getKey();
                        while (leftList.size() < 10 && rightList.size() > 0) {
                            leftList.add(rightList.remove(0));
                        }
                        left.setValue(left.getValue() + right.getValue());
                        return left;
                    }));
Run Code Online (Sandbox Code Playgroud)

假设您有以下代码:

Set.of(1, 2, 3, 4)
            .stream()
            .parallel()
            .collect(Collector.of(
                    ArrayList::new,
                    (list, ele) -> {
                        System.out.println("Called accumulator");
                        list.add(ele);
                    },
                    (left, right) -> {
                        System.out.println("Combiner called");
                        left.addAll(right);
                        return left;
                    },
                    new Characteristics[] { Characteristics.CONCURRENT }));
Run Code Online (Sandbox Code Playgroud)

在开始考虑该代码之前(对于示例而言,代码的正确性很重要),我们需要阅读一些文档以了解CONCURRENT其特征:

如果CONCURRENT收集器也不是UNDERDERED,则仅在应用于无序数据源时才应同时评估它。

这是什么文件基本上说的是,如果你的收集器CONCURRENT 流的来源是UNORDERED(像Set),或者我们明确地调用unordered,然后合并将永远不会被调用。

如果运行先前的代码,您将看到Combiner called输出中永远不会存在该代码。

如果将更Set.of(1, 2, 3, 4)改为List.of(1, 2, 3, 4),则将看到不同的图片(忽略得到的结果的正确性-因为ArrayList这不是线程安全的,但这不是重点)。如果您将流的源设为a,List 并且同时调用,unordered您将再次看到仅调用累加器,即:

 List.of(1, 2, 3, 4)
            .stream()
            .unordered()
            .parallel()
            .collect(Collector.of(
                    ArrayList::new,
                    (list, ele) -> {
                        System.out.println("Called accumulator");
                        list.add(ele);
                    },
                    (left, right) -> {
                        System.out.println("Combiner called");
                        left.addAll(right);
                        return left;
                    },
                    new Characteristics[] { Characteristics.CONCURRENT }));
Run Code Online (Sandbox Code Playgroud)