Java 并行流 - 调用 parallel() 方法的顺序

abi*_*abi 12 java java-stream

AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
     .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
     .parallel()           
     .filter(record -> doSomeOperation())
     .findFirst()
Run Code Online (Sandbox Code Playgroud)

当我写这篇文章时,我假设线程将只产生 map 调用,因为 parallel 放在 map 之后。但是文件中的某些行在每次执行时都获得了不同的记录号。

我阅读了官方Java 流文档和一些网站,以了解流在幕后是如何工作的。

几个问题:

  • Java 并行流基于SplitIterator 工作,它由每个集合(如 ArrayList、LinkedList 等)实现。当我们从这些集合中构造并行流时,将使用相应的拆分迭代器来拆分和迭代集合。这解释了为什么并行发生在原始输入源(文件行)级别而不是映射的结果(即记录 pojo)。我的理解正确吗?

  • 就我而言,输入是文件 IO 流。将使用哪个拆分迭代器?

  • 我们parallel()在管道中的位置并不重要。原始输入源将始终被拆分,并将应用剩余的中间操作。

    在这种情况下,Java 不应该允许用户在管道中的任何地方放置并行操作,除了原始源。因为,对于那些不知道 java 流内部如何工作的人来说,这是错误的理解。我知道parallel()已经为 Stream 对象类型定义了操作,因此它以这种方式工作。但是,最好提供一些替代解决方案。

  • 在上面的代码片段中,我试图为输入文件中的每条记录添加一个行号,因此应该对其进行排序。但是,我想doSomeOperation()并行应用,因为它是重量级逻辑。实现的一种方法是编写我自己的自定义拆分迭代器。有没有其他办法?

ern*_*t_k 9

这解释了为什么并行发生在原始输入源(文件行)级别而不是映射的结果(即记录 pojo)。

整个流要么是并行的,要么是顺序的。我们不选择顺序或并行运行的操作子集。

当启动终端操作时,流管道将根据调用它的流的方向顺序或并行执行。[...] 当启动终端操作时,流管道将根据调用它的流的模式顺序或并行执行。同一来源

正如您所提到的,并行流使用拆分迭代器。显然,这是在操作开始运行之前对数据进行分区。


就我而言,输入是文件 IO 流。将使用哪个拆分迭代器?

查看源代码,我看到它使用 java.nio.file.FileChannelLinesSpliterator


我们将 parallel() 放在管道中的什么位置并不重要。原始输入源将始终被拆分,并将应用剩余的中间操作。

对。您甚至可以多次拨打parallel()sequential()。最后调用的将获胜。当我们调用 时parallel(),我们为返回的流设置​​它;并且如上所述,所有操作要么按顺序运行,要么并行运行。


在这种情况下,Java 不应该允许用户在管道中的任何地方放置并行操作,除了原始源...

这变成了一个意见问题。我认为 Zabuza 提供了一个很好的理由来支持 JDK 设计者的选择。


实现的一种方法是编写我自己的自定义拆分迭代器。有没有其他办法?

这取决于您的操作

  • 如果findFirst()是你真正的终端操作,那么你甚至不需要担心并行执行,因为doSomething()反正不会有很多调用(findFirst()是短路)。.parallel()实际上可能会导致处理多个元素,而findFirst()在顺序流上会阻止这种情况。
  • 如果您的终端操作不会创建太多数据,那么您可以Record使用顺序流创建对象,然后并行处理结果:

    List<Record> smallData = Files.lines(inputFile.toPath(), 
                                         StandardCharsets.UTF_8)
      .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
      .collect(Collectors.toList())
      .parallelStream()     
      .filter(record -> doSomeOperation())
      .collect(Collectors.toList());
    
    Run Code Online (Sandbox Code Playgroud)
  • 如果您的管道会在内存中加载大量数据(这可能是您使用 的原因Files.lines()),那么您可能需要一个自定义拆分迭代器。不过,在我去那里之前,我会研究其他选项(例如使用 id 列保存行 - 这只是我的意见)。
    我还尝试小批量处理记录,如下所示:

    AtomicInteger recordNumber = new AtomicInteger();
    final int batchSize = 10;
    
    try(BufferedReader reader = Files.newBufferedReader(inputFile.toPath(), 
            StandardCharsets.UTF_8);) {
        Supplier<List<Record>> batchSupplier = () -> {
            List<Record> batch = new ArrayList<>();
            for (int i = 0; i < batchSize; i++) {
                String nextLine;
                try {
                    nextLine = reader.readLine();
                } catch (IOException e) {
                    //hanlde exception
                    throw new RuntimeException(e);
                }
    
                if(null == nextLine) 
                    return batch;
                batch.add(new Record(recordNumber.getAndIncrement(), nextLine));
            }
            System.out.println("next batch");
    
            return batch;
        };
    
        Stream.generate(batchSupplier)
            .takeWhile(list -> list.size() >= batchSize)
            .map(list -> list.parallelStream()
                             .filter(record -> doSomeOperation())
                             .collect(Collectors.toList()))
            .flatMap(List::stream)
            .forEach(System.out::println);
    }
    
    Run Code Online (Sandbox Code Playgroud)

    这会doSomeOperation()并行执行,无需将所有数据加载到内存中。但请注意,batchSize这需要考虑一下。

  • 自定义的“Spliterator”实现不会比这更复杂,同时允许更有效的并行处理...... (2认同)
  • 没有理由假设“findFirst()”操作将仅处理少量元素。在处理了 90% 的所有元素后,第一个匹配仍然可能发生。此外,当有 1000 万行时,即使在 10% 之后找到匹配,仍然需要处理 100 万行。 (2认同)

Hol*_*ger 8

最初的 Stream 设计包括支持具有不同并行执行设置的后续流水线阶段的想法,但该想法已被放弃。API 可能源于这个时间,但另一方面,强制调用者为并行或顺序执行做出单一明确决定的 API 设计会复杂得多。

实际Spliterator使用Files.lines(…)是依赖于实现的。在 Java 8(Oracle 或 OpenJDK)中,你总是得到与BufferedReader.lines(). 在较新的 JDK 中,如果Path属于默认文件系统并且字符集是此功能支持的字符集之一,则您将获得具有专用Spliterator实现的 Stream ,java.nio.file.FileChannelLinesSpliterator. 如果不满足先决条件,您将获得与 with 相同的结果BufferedReader.lines(),它仍然基于Iterator实现 insideBufferedReader并包装 via Spliterators.spliteratorUnknownSize

您的特定任务最好使用自定义Spliterator来处理,该自定义可以在并行处理之前在源处执行行编号,以允许后续并行处理不受限制。

public static Stream<Record> records(Path p) throws IOException {
    LineNoSpliterator sp = new LineNoSpliterator(p);
    return StreamSupport.stream(sp, false).onClose(sp);
}

private static class LineNoSpliterator implements Spliterator<Record>, Runnable {
    int chunkSize = 100;
    SeekableByteChannel channel;
    LineNumberReader reader;

    LineNoSpliterator(Path path) throws IOException {
        channel = Files.newByteChannel(path, StandardOpenOption.READ);
        reader=new LineNumberReader(Channels.newReader(channel,StandardCharsets.UTF_8));
    }

    @Override
    public void run() {
        try(Closeable c1 = reader; Closeable c2 = channel) {}
        catch(IOException ex) { throw new UncheckedIOException(ex); }
        finally { reader = null; channel = null; }
    }

    @Override
    public boolean tryAdvance(Consumer<? super Record> action) {
        try {
            String line = reader.readLine();
            if(line == null) return false;
            action.accept(new Record(reader.getLineNumber(), line));
            return true;
        } catch (IOException ex) {
            throw new UncheckedIOException(ex);
        }
    }

    @Override
    public Spliterator<Record> trySplit() {
        Record[] chunks = new Record[chunkSize];
        int read;
        for(read = 0; read < chunks.length; read++) {
            int pos = read;
            if(!tryAdvance(r -> chunks[pos] = r)) break;
        }
        return Spliterators.spliterator(chunks, 0, read, characteristics());
    }

    @Override
    public long estimateSize() {
        try {
            return (channel.size() - channel.position()) / 60;
        } catch (IOException ex) {
            return 0;
        }
    }

    @Override
    public int characteristics() {
        return ORDERED | NONNULL | DISTINCT;
    }
}
Run Code Online (Sandbox Code Playgroud)